chenboat commented on a change in pull request #4914: [POC] By-passing 
deep-store requirement for Realtime segment completion
URL: https://github.com/apache/incubator-pinot/pull/4914#discussion_r398959989
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 ##########
 @@ -257,21 +280,78 @@ public void downloadAndReplaceSegment(String 
segmentName, LLCRealtimeSegmentZKMe
     final File segmentFolder = new File(_indexDir, segmentName);
     FileUtils.deleteQuietly(segmentFolder);
     try {
-      
SegmentFetcherFactory.getInstance().getSegmentFetcherBasedOnURI(uri).fetchSegmentToLocal(uri,
 tempFile);
-      _logger.info("Downloaded file from {} to {}; Length of downloaded file: 
{}", uri, tempFile, tempFile.length());
+      boolean downloadResult = downloadFromURI(uri, tempFile);
+      if (!downloadResult) {
+         String peerServerUri = getPeerServerURI(segmentName);
+         if (peerServerUri == null || !downloadFromURI(peerServerUri, 
tempFile)) {
+           _logger.warn("Download segment {} from {} failed.", segmentName, 
peerServerUri);
+           return;
+         }
+      }
       TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
-      _logger.info("Uncompressed file {} into tmp dir {}", tempFile, 
tempSegmentFolder);
+      _logger.warn("Uncompressed file {} into tmp dir {}", tempFile, 
tempSegmentFolder);
       FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
-      _logger.info("Replacing LLC Segment {}", segmentName);
+      _logger.warn("Replacing LLC Segment {}", segmentName);
       replaceLLSegment(segmentName, indexLoadingConfig);
     } catch (Exception e) {
+      _logger.error("Failed to download segment {}.", e.getMessage());
       throw new RuntimeException(e);
     } finally {
       FileUtils.deleteQuietly(tempFile);
       FileUtils.deleteQuietly(tempSegmentFolder);
     }
   }
 
+  // Return the address of an ONLINE server hosting a segment.
+  private String getPeerServerURI(String segmentName) {
+    ExternalView externalViewForResource = 
HelixHelper.getExternalViewForResource(_helixAdmin, _clusterName, 
_tableNameWithType);
+    if (externalViewForResource == null ) {
+      _logger.warn("External View not found for segment {}", segmentName);
+      return null;
+    }
+    // Find out the ONLINE server serving the segment.
+    for(String segment : externalViewForResource.getPartitionSet()) {
+      if (!segmentName.equals(segment)) {
+        continue;
+      }
+
+      Map<String, String> instanceToStateMap = 
externalViewForResource.getStateMap(segmentName);
+      for (Map.Entry<String, String> instanceState : 
instanceToStateMap.entrySet()) {
+        if ("ONLINE".equals(instanceState.getValue())) {
+          _logger.info("Found ONLINE server {} for segment {}.", 
instanceState.getKey(), segmentName);
+          String instanceId = instanceState.getKey();
+
+          String namePortStr = 
instanceId.split(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)[1];
+          String hostName = namePortStr.split("_")[0];
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to