tibrewalpratik17 commented on code in PR #14506:
URL: https://github.com/apache/pinot/pull/14506#discussion_r1851836657
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1555,26 +1555,38 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
segmentName));
}
- // Randomly ask one server to upload
- URI uri =
peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
- String serverUploadRequestUrl = StringUtil.join("/", uri.toString(),
"upload");
- serverUploadRequestUrl =
- String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl,
_deepstoreUploadRetryTimeoutMs);
- LOGGER.info("Ask server to upload LLC segment {} to deep store by
this path: {}", segmentName,
- serverUploadRequestUrl);
- String tempSegmentDownloadUrl =
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
- String segmentDownloadUrl =
- moveSegmentFile(rawTableName, segmentName,
tempSegmentDownloadUrl, pinotFS);
- LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, segmentDownloadUrl);
-
- // Update segment ZK metadata by adding the download URL
- segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
- // TODO: add version check when persist segment ZK metadata
- persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
- LOGGER.info("Successfully uploaded LLC segment {} to deep store with
download url: {}", segmentName,
- segmentDownloadUrl);
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS,
1L);
+ int iteration = 0;
+ // Round robin the servers until we find the one with the correct
crc and successful upload
+ // If server is the last valid URI left then skip crc check as
deepstore copy reliability takes a higher
+ // priority
+ for (URI uri: peerSegmentURIs) {
+ String serverUploadRequestUrl = StringUtil.join("/",
uri.toString(), "upload");
+ serverUploadRequestUrl =
+ String.format("%s?uploadTimeoutMs=%d&expectedCrc=%d",
serverUploadRequestUrl,
+ _deepstoreUploadRetryTimeoutMs,
+ (iteration == (peerSegmentURIs.size() - 1) ? -1 :
segmentZKMetadata.getCrc()));
+ LOGGER.info("Ask server {} to upload LLC segment {} to deep store
by this path: {}", uri, segmentName,
+ serverUploadRequestUrl);
+ String tempSegmentDownloadUrl;
+ try {
+ tempSegmentDownloadUrl =
_fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to upload LLC segment {} to deepstore from
server {}", segmentName, uri);
+ iteration++;
+ continue;
+ }
+
+ String segmentDownloadUrl = moveSegmentFile(rawTableName,
segmentName, tempSegmentDownloadUrl, pinotFS);
+ // Update segment ZK metadata by adding the download URL
+ LOGGER.info("Updating segment {} download url in ZK to be {}",
segmentName, segmentDownloadUrl);
+ segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
+ // TODO: add version check when persist segment ZK metadata
+ persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
+ LOGGER.info("Successfully uploaded LLC segment {} to deep store
with download url: {}", segmentName,
Review Comment:
Yeah my bad fixed now!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]