pradeeee commented on code in PR #18641:
URL: https://github.com/apache/pinot/pull/18641#discussion_r3448484785


##########
pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java:
##########
@@ -107,6 +111,11 @@ public PredownloadScheduler(PropertiesConfiguration 
properties)
     _failedSegments = ConcurrentHashMap.newKeySet();
     _executor = Executors.newFixedThreadPool(predownloadParallelism);
     LOGGER.info("Created thread pool with num of threads: {}", 
predownloadParallelism);
+
+    _peerDownloadEnabled = peerDownloadEnabled;
+    _peerDownloadScheme = 
_instanceDataManagerConfig.getSegmentPeerDownloadScheme();

Review Comment:
   Done.



##########
pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java:
##########
@@ -336,6 +355,52 @@ void downloadSegment(PredownloadSegmentInfo 
predownloadSegmentInfo)
     }
   }
 
+  private void downloadFromDeepStore(PredownloadSegmentInfo 
predownloadSegmentInfo)
+      throws Exception {
+    File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo);
+    if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar()
+        && predownloadSegmentInfo.getCrypterName() == null) {
+      try {
+        File untaredSegDir = 
downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir,
+            
_instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
+        moveSegment(predownloadSegmentInfo, untaredSegDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    } else {
+      try {
+        File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir);
+        untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    }
+  }
+
+  private void downloadFromPeers(PredownloadSegmentInfo predownloadSegmentInfo)

Review Comment:
   Implemented StremUntar flow, it will be preferred, if not enabled then 
fallback is download -> untar -> move.
   



##########
pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java:
##########
@@ -107,6 +112,18 @@ public PredownloadScheduler(PropertiesConfiguration 
properties)
     _failedSegments = ConcurrentHashMap.newKeySet();
     _executor = Executors.newFixedThreadPool(predownloadParallelism);
     LOGGER.info("Created thread pool with num of threads: {}", 
predownloadParallelism);
+
+    _peerDownloadEnabled = 
properties.getBoolean("pinot.server.peer.download.enabled", false);

Review Comment:
   Updated the code, declared inside the configs.



##########
pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java:
##########
@@ -336,6 +355,52 @@ void downloadSegment(PredownloadSegmentInfo 
predownloadSegmentInfo)
     }
   }
 
+  private void downloadFromDeepStore(PredownloadSegmentInfo 
predownloadSegmentInfo)
+      throws Exception {
+    File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo);
+    if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar()
+        && predownloadSegmentInfo.getCrypterName() == null) {
+      try {
+        File untaredSegDir = 
downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir,
+            
_instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
+        moveSegment(predownloadSegmentInfo, untaredSegDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    } else {
+      try {
+        File tarFile = downloadAndDecrypt(predownloadSegmentInfo, tempRootDir);
+        untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir);
+      } finally {
+        FileUtils.deleteQuietly(tempRootDir);
+      }
+    }
+  }
+
+  private void downloadFromPeers(PredownloadSegmentInfo predownloadSegmentInfo)

Review Comment:
   Added below metrics to track the gain from peer download:
   
   PREDOWNLOAD_DEEPSTORE_DOWNLOAD_COUNT - counter for deep store download 
success
   PREDOWNLOAD_PEER_SEGMENT_DOWNLOAD_COUNT - counter for peer download success
   PREDOWNLOAD_PEER_SEGMENT_DOWNLOAD_FAILURE_COUNT - counter for peer download 
failure (with segment name as key)
   PEER_DOWNLOAD_SPEED - gauge for peer download speed (MB/s)



##########
pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadMetrics.java:
##########
@@ -47,6 +47,20 @@ public void segmentDownloaded(boolean succeed, String 
segmentName, long segmentS
     }
   }
 
+  public void peerSegmentDownloaded(boolean succeed, String segmentName, long 
segmentSizeBytes, long downloadTimeMs) {
+    if (succeed) {
+      
_serverMetrics.addMeteredGlobalValue(ServerMeter.PREDOWNLOAD_PEER_SEGMENT_DOWNLOAD_COUNT,
 1);
+      _serverMetrics.setValueOfGlobalGauge(ServerGauge.PEER_DOWNLOAD_SPEED,

Review Comment:
   Added the change.



##########
pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadMetrics.java:
##########
@@ -47,6 +47,20 @@ public void segmentDownloaded(boolean succeed, String 
segmentName, long segmentS
     }
   }
 
+  public void peerSegmentDownloaded(boolean succeed, String segmentName, long 
segmentSizeBytes, long downloadTimeMs) {
+    if (succeed) {
+      
_serverMetrics.addMeteredGlobalValue(ServerMeter.PREDOWNLOAD_PEER_SEGMENT_DOWNLOAD_COUNT,
 1);
+      _serverMetrics.setValueOfGlobalGauge(ServerGauge.PEER_DOWNLOAD_SPEED,
+          (segmentSizeBytes / BYTES_TO_MB) / (downloadTimeMs / 1000 + 1));
+    } else {
+      
_serverMetrics.addMeteredValue(ServerMeter.PREDOWNLOAD_PEER_SEGMENT_DOWNLOAD_FAILURE_COUNT,
 1, segmentName);

Review Comment:
   Dropped it. 



##########
pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java:
##########
@@ -301,23 +318,25 @@ void downloadSegment(PredownloadSegmentInfo 
predownloadSegmentInfo)
       throws Exception {
     try {
       long startTime = System.currentTimeMillis();
-      File tempRootDir = getTmpSegmentDataDir(predownloadSegmentInfo);
-      if (_instanceDataManagerConfig.isStreamSegmentDownloadUntar()
-          && predownloadSegmentInfo.getCrypterName() == null) {
-        try {
-          // TODO: increase rate limit here
-          File untaredSegDir = 
downloadAndStreamUntarWithRateLimit(predownloadSegmentInfo, tempRootDir,
-              
_instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit());
-          moveSegment(predownloadSegmentInfo, untaredSegDir);
-        } finally {
-          FileUtils.deleteQuietly(tempRootDir);
-        }
-      } else {
-        try {
-          File tarFile = downloadAndDecrypt(predownloadSegmentInfo, 
tempRootDir);
-          untarAndMoveSegment(predownloadSegmentInfo, tarFile, tempRootDir);
-        } finally {
-          FileUtils.deleteQuietly(tempRootDir);
+      try {
+        downloadFromDeepStore(predownloadSegmentInfo);
+        _predownloadMetrics.deepStoreSegmentDownloaded();
+      } catch (Exception e) {
+        if (_peerDownloadEnabled && _peerDownloadScheme != null) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to