Jackie-Jiang commented on a change in pull request #6567:
URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r598967441
##########
File path:
pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
##########
@@ -118,6 +125,47 @@ public static void fetchSegmentToLocal(String uri, File
dest)
fetchSegmentToLocal(new URI(uri), dest);
}
+ /**
+ * Fetches a segment from a given URI and untar the segment file to the dest
dir (i.e., tableDataDir + segmentName).
+ */
+ public static void fetchAndUntarSegmentToLocal(String uri, File
tableDataDir, String segmentName)
Review comment:
Why do we need this method? There is already an `untarAndMoveSegment()`
within the `RealtimeTableDataManager` class. We should be able to use that
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -57,41 +57,37 @@ public ZKOperator(PinotHelixResourceManager
pinotHelixResourceManager, Controlle
_controllerMetrics = controllerMetrics;
}
- public void completeSegmentOperations(String rawTableName, SegmentMetadata
segmentMetadata,
+ public void completeSegmentOperations(String tableNameWithType,
SegmentMetadata segmentMetadata,
URI finalSegmentLocationURI, File currentSegmentLocation, boolean
enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI, boolean
moveSegmentToFinalLocation, String crypter)
throws Exception {
- String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
String segmentName = segmentMetadata.getName();
-
- // Brand new segment, not refresh, directly add the segment
- ZNRecord segmentMetadataZnRecord =
-
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName,
segmentName);
+ ZNRecord segmentMetadataZnRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType,
segmentName);
if (segmentMetadataZnRecord == null) {
- LOGGER.info("Adding new segment {} from table {}", segmentName,
rawTableName);
+ LOGGER.info("Adding new segment {} from table {}", segmentName,
tableNameWithType);
processNewSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation, zkDownloadURI, crypter,
- rawTableName, segmentName, moveSegmentToFinalLocation);
+ tableNameWithType, segmentName, moveSegmentToFinalLocation);
return;
}
- LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, rawTableName);
+ LOGGER.info("Segment {} from table {} already exists, refreshing if
necessary", segmentName, tableNameWithType);
processExistingSegment(segmentMetadata, finalSegmentLocationURI,
currentSegmentLocation,
- enableParallelPushProtection, headers, zkDownloadURI, crypter,
offlineTableName, segmentName,
+ enableParallelPushProtection, headers, zkDownloadURI, crypter,
tableNameWithType, segmentName,
segmentMetadataZnRecord, moveSegmentToFinalLocation);
}
private void processExistingSegment(SegmentMetadata segmentMetadata, URI
finalSegmentLocationURI,
File currentSegmentLocation, boolean enableParallelPushProtection,
HttpHeaders headers, String zkDownloadURI,
- String crypter, String offlineTableName, String segmentName, ZNRecord
znRecord,
+ String crypter, String tableNameWithType, String segmentName, ZNRecord
znRecord,
boolean moveSegmentToFinalLocation)
throws Exception {
OfflineSegmentZKMetadata existingSegmentZKMetadata = new
OfflineSegmentZKMetadata(znRecord);
Review comment:
(Critical) For real-time segment, this should not be
`OfflineSegmentZKMetadata`, where the `status` and `offset` info are lost
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
##########
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.assignment.segment;
import com.google.common.base.Preconditions;
+
Review comment:
(nit) reformat
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -403,10 +414,13 @@ private void decryptFile(String crypterClassName, File
tempEncryptedFile, File t
// it keeps it at the downloadURI header that is set. We will not support
this endpoint going forward.
public void uploadSegmentAsJson(String segmentJsonStr,
@ApiParam(value = "Name of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String
tableName,
+ @ApiParam(value = "Type of the table")
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE)
@DefaultValue("OFFLINE") String tableType,
@ApiParam(value = "Whether to enable parallel push protection")
@DefaultValue("false")
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
boolean enableParallelPushProtection,
@Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
try {
- asyncResponse.resume(uploadSegment(tableName, null,
enableParallelPushProtection, headers, request, false));
+ asyncResponse.resume(
+ uploadSegment(tableName, "OFFLINE".equalsIgnoreCase(tableType) ?
TableType.OFFLINE : TableType.REALTIME, null,
Review comment:
Use `TableType.valueOf(tableType.toUpperCase())`? We don't want to
upload segment to real-time table when getting random `tableType`. Same for
other places
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]