Jackie-Jiang commented on code in PR #12017:
URL: https://github.com/apache/pinot/pull/12017#discussion_r1408522908
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
- @Test
+ @Test(timeOut = 30_000L)
Review Comment:
```suggestion
@Test
```
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -914,9 +916,9 @@ public void testCommitSegmentMetadata() {
/**
* Test cases for fixing LLC segment by uploading to segment store if missing
*/
- @Test
+ @Test(timeOut = 30_000L)
public void testUploadToSegmentStore()
- throws HttpErrorStatusException, IOException, URISyntaxException {
+ throws HttpErrorStatusException, IOException, URISyntaxException,
InterruptedException {
Review Comment:
```suggestion
throws HttpErrorStatusException, IOException, URISyntaxException {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
continue;
}
// Skip the fix for the segment if it is already out of retention.
- if (retentionStrategy != null &&
retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
- LOGGER.info("Skipped deep store uploading of LLC segment {} which is
already out of retention", segmentName);
+ if (retentionStrategy.isPurgeable(realtimeTableName,
segmentZKMetadata)) {
+ LOGGER.info("Skipped deep store uploading of LLC segment {} which is
already out of retention",
+ segmentName);
continue;
}
- LOGGER.info("Fixing LLC segment {} whose deep store copy is
unavailable", segmentName);
-
- // Find servers which have online replica
- List<URI> peerSegmentURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName,
CommonConstants.HTTP_PROTOCOL, _helixManager);
- if (peerSegmentURIs.isEmpty()) {
- throw new IllegalStateException(
- String.format("Failed to upload segment %s to deep store because
no online replica is found",
- segmentName));
- }
-
- // Randomly ask one server to upload
- URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
- String serverUploadRequestUrl = StringUtil.join("/", uri.toString(),
"upload");
- serverUploadRequestUrl =
- String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl,
_deepstoreUploadRetryTimeoutMs);
- LOGGER.info("Ask server to upload LLC segment {} to deep store by this
path: {}", segmentName,
- serverUploadRequestUrl);
- String tempSegmentDownloadUrl =
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
- String segmentDownloadUrl =
- moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl,
pinotFS);
- LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, segmentDownloadUrl);
- // Update segment ZK metadata by adding the download URL
- segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
- // TODO: add version check when persist segment ZK metadata
- persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
- LOGGER.info("Successfully uploaded LLC segment {} to deep store with
download url: {}", segmentName,
- segmentDownloadUrl);
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
} catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
- LOGGER.error("Failed to upload segment {} to deep store", segmentName,
e);
+ LOGGER.warn("Failed checking segment deep store URL for segment {}",
segmentName);
+ }
+
+ // Skip the fix if an upload is already queued for this segment
+ if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+ continue;
}
+ _deepStoreUploadExecutorPendingSegments.add(segmentName);
Review Comment:
Should we also update the queue size here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1405,48 +1414,77 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
continue;
}
// Skip the fix for the segment if it is already out of retention.
- if (retentionStrategy != null &&
retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
- LOGGER.info("Skipped deep store uploading of LLC segment {} which is
already out of retention", segmentName);
+ if (retentionStrategy.isPurgeable(realtimeTableName,
segmentZKMetadata)) {
+ LOGGER.info("Skipped deep store uploading of LLC segment {} which is
already out of retention",
+ segmentName);
continue;
}
- LOGGER.info("Fixing LLC segment {} whose deep store copy is
unavailable", segmentName);
-
- // Find servers which have online replica
- List<URI> peerSegmentURIs =
- PeerServerSegmentFinder.getPeerServerURIs(segmentName,
CommonConstants.HTTP_PROTOCOL, _helixManager);
- if (peerSegmentURIs.isEmpty()) {
- throw new IllegalStateException(
- String.format("Failed to upload segment %s to deep store because
no online replica is found",
- segmentName));
- }
-
- // Randomly ask one server to upload
- URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
- String serverUploadRequestUrl = StringUtil.join("/", uri.toString(),
"upload");
- serverUploadRequestUrl =
- String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl,
_deepstoreUploadRetryTimeoutMs);
- LOGGER.info("Ask server to upload LLC segment {} to deep store by this
path: {}", segmentName,
- serverUploadRequestUrl);
- String tempSegmentDownloadUrl =
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
- String segmentDownloadUrl =
- moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl,
pinotFS);
- LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, segmentDownloadUrl);
- // Update segment ZK metadata by adding the download URL
- segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
- // TODO: add version check when persist segment ZK metadata
- persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
- LOGGER.info("Successfully uploaded LLC segment {} to deep store with
download url: {}", segmentName,
- segmentDownloadUrl);
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
} catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
- LOGGER.error("Failed to upload segment {} to deep store", segmentName,
e);
+ LOGGER.warn("Failed checking segment deep store URL for segment {}",
segmentName);
+ }
+
+ // Skip the fix if an upload is already queued for this segment
+ if (_deepStoreUploadExecutorPendingSegments.contains(segmentName)) {
+ continue;
}
+ _deepStoreUploadExecutorPendingSegments.add(segmentName);
Review Comment:
(minor)
```suggestion
// Skip the fix if an upload is already queued for this segment
if (!_deepStoreUploadExecutorPendingSegments.add(segmentName)) {
continue;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]