chenboat commented on a change in pull request #6567:
URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r599980587
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -57,41 +57,37 @@ public ZKOperator(PinotHelixResourceManager
pinotHelixResourceManager, Controlle
_controllerMetrics = controllerMetrics;
}
- public void completeSegmentOperations(String rawTableName, SegmentMetadata
segmentMetadata,
+ public void completeSegmentOperations(String tableNameWithType,
SegmentMetadata segmentMetadata,
URI finalSegmentLocationURI, File currentSegmentLocation, boolean
enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI, boolean
moveSegmentToFinalLocation, String crypter)
throws Exception {
- String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String segmentName = segmentMetadata.getName();
-
- // Brand new segment, not refresh, directly add the segment
- ZNRecord segmentMetadataZnRecord =
-
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName,
segmentName);
+ ZNRecord segmentMetadataZnRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
if (segmentMetadataZnRecord == null) {
- LOGGER.info("Adding new segment {} from table {}", segmentName,
rawTableName);
+ LOGGER.info("Adding new segment {} from table {}", segmentName,
tableNameWithType);
processNewSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, zkDownloadURI, crypter,
- rawTableName, segmentName, moveSegmentToFinalLocation);
+ tableNameWithType, segmentName, moveSegmentToFinalLocation);
return;
}
- LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, rawTableName);
+ LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, tableNameWithType);
processExistingSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation,
- enableParallelPushProtection, headers, zkDownloadURI, crypter,
offlineTableName, segmentName,
+ enableParallelPushProtection, headers, zkDownloadURI, crypter,
tableNameWithType, segmentName,
segmentMetadataZnRecord, moveSegmentToFinalLocation);
}
private void processExistingSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI,
- String crypter, String offlineTableName, String segmentName, ZNRecord
znRecord,
+ String crypter, String tableNameWithType, String segmentName, ZNRecord
znRecord,
boolean moveSegmentToFinalLocation)
throws Exception {
OfflineSegmentZKMetadata existingSegmentZKMetadata = new
OfflineSegmentZKMetadata(znRecord);
Review comment:
for processExistingSegment(), I think we should leave it to another PR
and let this PR focus on adding new segments. For now, this PR will reject
upload segments of the same name. Notice that for our Upsert table use case,
uploading new segments would be sufficient. I will have a follow up PR for
refreshing existing segments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]