chenboat commented on a change in pull request #4914: [POC] By-passing deep-store requirement for Realtime segment completion URL: https://github.com/apache/incubator-pinot/pull/4914#discussion_r398959989
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java ########## @@ -257,21 +280,78 @@ public void downloadAndReplaceSegment(String segmentName, LLCRealtimeSegmentZKMe final File segmentFolder = new File(_indexDir, segmentName); FileUtils.deleteQuietly(segmentFolder); try { - SegmentFetcherFactory.getInstance().getSegmentFetcherBasedOnURI(uri).fetchSegmentToLocal(uri, tempFile); - _logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", uri, tempFile, tempFile.length()); + boolean downloadResult = downloadFromURI(uri, tempFile); + if (!downloadResult) { + String peerServerUri = getPeerServerURI(segmentName); + if (peerServerUri == null || !downloadFromURI(peerServerUri, tempFile)) { + _logger.warn("Download segment {} from {} failed.", segmentName, peerServerUri); + return; + } + } TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder); - _logger.info("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder); + _logger.warn("Uncompressed file {} into tmp dir {}", tempFile, tempSegmentFolder); FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder); - _logger.info("Replacing LLC Segment {}", segmentName); + _logger.warn("Replacing LLC Segment {}", segmentName); replaceLLSegment(segmentName, indexLoadingConfig); } catch (Exception e) { + _logger.error("Failed to download segment {}.", e.getMessage()); throw new RuntimeException(e); } finally { FileUtils.deleteQuietly(tempFile); FileUtils.deleteQuietly(tempSegmentFolder); } } + // Return the address of an ONLINE server hosting a segment. + private String getPeerServerURI(String segmentName) { + ExternalView externalViewForResource = HelixHelper.getExternalViewForResource(_helixAdmin, _clusterName, _tableNameWithType); + if (externalViewForResource == null ) { + _logger.warn("External View not found for segment {}", segmentName); + return null; + } + // Find out the ONLINE server serving the segment. + for(String segment : externalViewForResource.getPartitionSet()) { + if (!segmentName.equals(segment)) { + continue; + } + + Map<String, String> instanceToStateMap = externalViewForResource.getStateMap(segmentName); + for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) { + if ("ONLINE".equals(instanceState.getValue())) { + _logger.info("Found ONLINE server {} for segment {}.", instanceState.getKey(), segmentName); + String instanceId = instanceState.getKey(); + + String namePortStr = instanceId.split(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)[1]; + String hostName = namePortStr.split("_")[0]; Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org