chenboat commented on a change in pull request #4914: [POC] By-passing
deep-store requirement for Realtime segment completion
URL: https://github.com/apache/incubator-pinot/pull/4914#discussion_r388023936
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -257,21 +280,78 @@ public void downloadAndReplaceSegment(String
segmentName, LLCRealtimeSegmentZKMe
final File segmentFolder = new File(_indexDir, segmentName);
FileUtils.deleteQuietly(segmentFolder);
try {
-
SegmentFetcherFactory.getInstance().getSegmentFetcherBasedOnURI(uri).fetchSegmentToLocal(uri,
tempFile);
- _logger.info("Downloaded file from {} to {}; Length of downloaded file:
{}", uri, tempFile, tempFile.length());
+ boolean downloadResult = downloadFromURI(uri, tempFile);
+ if (!downloadResult) {
+ String peerServerUri = getPeerServerURI(segmentName);
+ if (peerServerUri == null || !downloadFromURI(peerServerUri,
tempFile)) {
+ _logger.warn("Download segment {} from {} failed.", segmentName,
peerServerUri);
+ return;
+ }
+ }
TarGzCompressionUtils.unTar(tempFile, tempSegmentFolder);
- _logger.info("Uncompressed file {} into tmp dir {}", tempFile,
tempSegmentFolder);
+ _logger.warn("Uncompressed file {} into tmp dir {}", tempFile,
tempSegmentFolder);
FileUtils.moveDirectory(tempSegmentFolder.listFiles()[0], segmentFolder);
- _logger.info("Replacing LLC Segment {}", segmentName);
+ _logger.warn("Replacing LLC Segment {}", segmentName);
replaceLLSegment(segmentName, indexLoadingConfig);
} catch (Exception e) {
+ _logger.error("Failed to download segment {}.", e.getMessage());
throw new RuntimeException(e);
} finally {
FileUtils.deleteQuietly(tempFile);
FileUtils.deleteQuietly(tempSegmentFolder);
}
}
+ // Return the address of an ONLINE server hosting a segment.
+ private String getPeerServerURI(String segmentName) {
+ ExternalView externalViewForResource =
HelixHelper.getExternalViewForResource(_helixAdmin, _clusterName,
_tableNameWithType);
+ if (externalViewForResource == null ) {
+ _logger.warn("External View not found for segment {}", segmentName);
+ return null;
+ }
+ // Find out the ONLINE server serving the segment.
+ for(String segment : externalViewForResource.getPartitionSet()) {
+ if (!segmentName.equals(segment)) {
+ continue;
+ }
+
+ Map<String, String> instanceToStateMap =
externalViewForResource.getStateMap(segmentName);
+ for (Map.Entry<String, String> instanceState :
instanceToStateMap.entrySet()) {
+ if ("ONLINE".equals(instanceState.getValue())) {
+ _logger.info("Found ONLINE server {} for segment {}.",
instanceState.getKey(), segmentName);
+ String instanceId = instanceState.getKey();
+
+ String namePortStr =
instanceId.split(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)[1];
+ String hostName = namePortStr.split("_")[0];
+
+ Map<String, String> instanceConfig =
+ HelixHelper.getInstanceConfigsMapFor(instanceId, _clusterName,
_helixAdmin);
+ int port;
+ try {
+ port =
Integer.parseInt(instanceConfig.get(CommonConstants.Helix.Instance.ADMIN_PORT_KEY));
+ } catch (Exception e) {
+ port = CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT;
+ }
+
+ return StringUtil.join("/", "http://" + hostName + ":" + port,
Review comment:
done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]