Jackie-Jiang commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1408522908


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
   /**
    * Test cases for fixing LLC segment by uploading to segment store if missing
    */
-  @Test
+  @Test(timeOut = 30_000L)

Review Comment:
   ```suggestion
     @Test
   ```



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
   /**
    * Test cases for fixing LLC segment by uploading to segment store if missing
    */
-  @Test
+  @Test(timeOut = 30_000L)
   public void testUploadToSegmentStore()
-      throws HttpErrorStatusException, IOException, URISyntaxException {
+      throws HttpErrorStatusException, IOException, URISyntaxException, 
InterruptedException {

Review Comment:
   ```suggestion
         throws HttpErrorStatusException, IOException, URISyntaxException {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig 
tableConfig, List<SegmentZKMe
           continue;
         }
         // Skip the fix for the segment if it is already out of retention.
-        if (retentionStrategy != null && 
retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
-          LOGGER.info("Skipped deep store uploading of LLC segment {} which is 
already out of retention", segmentName);
+        if (retentionStrategy.isPurgeable(realtimeTableName, 
segmentZKMetadata)) {
+          LOGGER.info("Skipped deep store uploading of LLC segment {} which is 
already out of retention",
+              segmentName);
           continue;
         }
-        LOGGER.info("Fixing LLC segment {} whose deep store copy is 
unavailable", segmentName);
-
-        // Find servers which have online replica
-        List<URI> peerSegmentURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
CommonConstants.HTTP_PROTOCOL, _helixManager);
-        if (peerSegmentURIs.isEmpty()) {
-          throw new IllegalStateException(
-              String.format("Failed to upload segment %s to deep store because 
no online replica is found",
-                  segmentName));
-        }
-
-        // Randomly ask one server to upload
-        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
-        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), 
"upload");
-        serverUploadRequestUrl =
-            String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, 
_deepstoreUploadRetryTimeoutMs);
-        LOGGER.info("Ask server to upload LLC segment {} to deep store by this 
path: {}", segmentName,
-            serverUploadRequestUrl);
-        String tempSegmentDownloadUrl = 
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
-        String segmentDownloadUrl =
-            moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, 
pinotFS);
-        LOGGER.info("Updating segment {} download url in ZK to be {}", 
segmentName, segmentDownloadUrl);
-        // Update segment ZK metadata by adding the download URL
-        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
-        // TODO: add version check when persist segment ZK metadata
-        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
-        LOGGER.info("Successfully uploaded LLC segment {} to deep store with 
download url: {}", segmentName,
-            segmentDownloadUrl);
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
       } catch (Exception e) {
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
-        LOGGER.error("Failed to upload segment {} to deep store", segmentName, 
e);
+        LOGGER.warn("Failed checking segment deep store URL for segment {}", 
segmentName);
+      }
+
+      // Skip the fix if an upload is already queued for this segment
+      if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+        continue;
       }
+      _deepStoreUploadExecutorPendingSegments.add(segmentName);

Review Comment:
   Should we also update the queue size here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig 
tableConfig, List<SegmentZKMe
           continue;
         }
         // Skip the fix for the segment if it is already out of retention.
-        if (retentionStrategy != null && 
retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
-          LOGGER.info("Skipped deep store uploading of LLC segment {} which is 
already out of retention", segmentName);
+        if (retentionStrategy.isPurgeable(realtimeTableName, 
segmentZKMetadata)) {
+          LOGGER.info("Skipped deep store uploading of LLC segment {} which is 
already out of retention",
+              segmentName);
           continue;
         }
-        LOGGER.info("Fixing LLC segment {} whose deep store copy is 
unavailable", segmentName);
-
-        // Find servers which have online replica
-        List<URI> peerSegmentURIs =
-            PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
CommonConstants.HTTP_PROTOCOL, _helixManager);
-        if (peerSegmentURIs.isEmpty()) {
-          throw new IllegalStateException(
-              String.format("Failed to upload segment %s to deep store because 
no online replica is found",
-                  segmentName));
-        }
-
-        // Randomly ask one server to upload
-        URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
-        String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), 
"upload");
-        serverUploadRequestUrl =
-            String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, 
_deepstoreUploadRetryTimeoutMs);
-        LOGGER.info("Ask server to upload LLC segment {} to deep store by this 
path: {}", segmentName,
-            serverUploadRequestUrl);
-        String tempSegmentDownloadUrl = 
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
-        String segmentDownloadUrl =
-            moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, 
pinotFS);
-        LOGGER.info("Updating segment {} download url in ZK to be {}", 
segmentName, segmentDownloadUrl);
-        // Update segment ZK metadata by adding the download URL
-        segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
-        // TODO: add version check when persist segment ZK metadata
-        persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
-        LOGGER.info("Successfully uploaded LLC segment {} to deep store with 
download url: {}", segmentName,
-            segmentDownloadUrl);
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
       } catch (Exception e) {
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
-        LOGGER.error("Failed to upload segment {} to deep store", segmentName, 
e);
+        LOGGER.warn("Failed checking segment deep store URL for segment {}", 
segmentName);
+      }
+
+      // Skip the fix if an upload is already queued for this segment
+      if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+        continue;
       }
+      _deepStoreUploadExecutorPendingSegments.add(segmentName);

Review Comment:
   (minor)
   ```suggestion
         // Skip the fix if an upload is already queued for this segment
         if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
           continue;
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to