klsince commented on code in PR #13597:
URL: https://github.com/apache/pinot/pull/13597#discussion_r1678288823
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -555,6 +750,67 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart
multiPart,
}
}
+ @POST
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ @Path("/segmentList")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Cluster.UPLOAD_SEGMENT)
+ @Authenticate(AccessType.CREATE)
+ @ApiOperation(value = "Upload a segment", notes = "Upload a segment as
binary")
Review Comment:
update the notes "Upload a batch of segments"
high level question: is this batch uploading API atomic? I see there is
logic to clean up uploaded segments upon any failure in the middle, so it's
atomic in terms of the uploading action, but I assume it's not atomic as to the
ongoing queries, as I didn't see SegmentLineage or similar mechanism was used
here. But anyway, might be helpful to comment the API about how things work
upon failures.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -403,6 +413,173 @@ private SuccessResponse uploadSegment(@Nullable String
tableName, TableType tabl
}
}
+ // Method used to update a list of segments in batch mode with the METADATA
upload type.
+ private SuccessResponse uploadSegments(String tableName, TableType tableType,
+ FormDataMultiPart multiParts, boolean enableParallelPushProtection,
+ boolean allowRefresh, HttpHeaders headers, Request request) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ String tableNameWithType = tableType == TableType.OFFLINE
+ ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)
+ : TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new ControllerApplicationException(LOGGER, "Failed to fetch table
config for table: " + tableNameWithType,
+ Response.Status.BAD_REQUEST);
+ }
+
+ String clientAddress;
+ try {
+ clientAddress =
InetAddress.getByName(request.getRemoteAddr()).getHostName();
+ } catch (UnknownHostException ex) {
+ throw new ControllerApplicationException(LOGGER, "Failed to resolve
hostname from input request",
+ Response.Status.BAD_REQUEST, ex);
+ }
+
+ String uploadTypeStr = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
+ FileUploadType uploadType = getUploadType(uploadTypeStr);
+ if (!FileUploadType.METADATA.equals(uploadType)) {
+ throw new ControllerApplicationException(LOGGER, "Unsupported upload
type: " + uploadTypeStr,
+ Response.Status.BAD_REQUEST);
+ }
+
+ String crypterClassNameInHeader = extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.CRYPTER);
+ String ingestionDescriptor = extractHttpHeader(headers,
CommonConstants.Controller.INGESTION_DESCRIPTOR);
+ ControllerFilePathProvider provider =
ControllerFilePathProvider.getInstance();
+ List<SegmentUploadMetadata> segmentUploadMetadataList = new ArrayList<>();
+ List<File> tempEncryptedFiles = new ArrayList<>();
+ List<File> tempDecryptedFiles = new ArrayList<>();
+ List<File> tempSegmentDirs = new ArrayList<>();
+ List<String> segmentNames = new ArrayList<>();
+
+ for (BodyPart bodyPartFromReq: multiParts.getBodyParts()) {
+ FormDataBodyPart bodyPart = (FormDataBodyPart) bodyPartFromReq;
+ String segmentName = bodyPart.getContentDisposition().getFileName();
+ segmentNames.add(segmentName);
+ if (StringUtils.isEmpty(segmentName)) {
+ throw new ControllerApplicationException(LOGGER,
+ "filename is a required field within the multipart object for
METADATA batch upload mode.",
+ Response.Status.BAD_REQUEST);
+ }
+ File tempEncryptedFile;
+ File tempDecryptedFile;
+ File tempSegmentDir;
+ try {
+ String sourceDownloadURIStr = extractHttpHeader(headers,
+ CommonConstants.Controller.SEGMENT_URI_HTTP_HEADER_PREFIX +
segmentName);
+ if (StringUtils.isEmpty(sourceDownloadURIStr)) {
+ throw new ControllerApplicationException(LOGGER,
+ "'DOWNLOAD_URI' is required as a field within the multipart
object for METADATA batch upload mode.",
+ Response.Status.BAD_REQUEST);
+ }
+ // The downloadUri for putting into segment zk metadata
+ String segmentDownloadURIStr = sourceDownloadURIStr;
+
+ String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
+ tempEncryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName + ENCRYPTED_SUFFIX);
+ tempEncryptedFiles.add(tempEncryptedFile);
+ tempDecryptedFile = new File(provider.getFileUploadTempDir(),
tempFileName);
+ tempDecryptedFiles.add(tempDecryptedFile);
+ tempSegmentDir = new File(provider.getUntarredFileTempDir(),
tempFileName);
+ tempSegmentDirs.add(tempSegmentDir);
+ boolean encryptSegment =
StringUtils.isNotEmpty(crypterClassNameInHeader);
+ File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile;
+ // override copySegmentToFinalLocation if override provided in
headers:COPY_SEGMENT_TO_DEEP_STORE
+ // else set to false for backward compatibility
+ String copySegmentToDeepStore =
+ extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
+ boolean copySegmentToFinalLocation =
Boolean.parseBoolean(copySegmentToDeepStore);
+ createSegmentFileFromBodyPart(bodyPart, destFile);
+ // Include the un-tarred segment size when available
+ long segmentSizeInBytes = getSegmentSizeFromHeaders(segmentName,
headers);
+ if (segmentSizeInBytes < 0) {
+ // Use the tarred segment size as an approximation.
+ segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr);
+ }
+ if (encryptSegment) {
+ decryptFile(crypterClassNameInHeader, tempEncryptedFile,
tempDecryptedFile);
+ }
+
+ String metadataProviderClass =
DefaultMetadataExtractor.class.getName();
+ SegmentMetadata segmentMetadata =
getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass);
+ LOGGER.info("Processing upload request for segment: {} of table: {}
with upload type: {} from client: {}, "
+ + "ingestion descriptor: {}", segmentName, tableNameWithType,
uploadType, clientAddress, ingestionDescriptor
+ );
+
+ // Validate segment
+ if (tableConfig.getIngestionConfig() == null ||
tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) {
+ SegmentValidationUtils.validateTimeInterval(segmentMetadata,
tableConfig);
+ }
+ long untarredSegmentSizeInBytes = 0L;
+ if (segmentSizeInBytes > 0) {
+ untarredSegmentSizeInBytes = segmentSizeInBytes;
+ }
+ SegmentValidationUtils.checkStorageQuota(segmentName,
untarredSegmentSizeInBytes, tableConfig,
+ _controllerConf, _storageQuotaChecker);
+
+ // Encrypt segment
+ String crypterNameInTableConfig =
tableConfig.getValidationConfig().getCrypterClassName();
+ Pair<String, File> encryptionInfo =
+ encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile,
encryptSegment,
+ crypterClassNameInHeader, crypterNameInTableConfig,
segmentName, tableNameWithType);
+ File segmentFile = encryptionInfo.getRight();
+
+ // Update download URI if controller is responsible for moving the
segment to the deep store
+ URI finalSegmentLocationURI = null;
+ if (copySegmentToFinalLocation) {
+ URI dataDirURI = provider.getDataDirURI();
+ String dataDirPath = dataDirURI.toString();
+ String encodedSegmentName = URIUtils.encode(segmentName);
+ String finalSegmentLocationPath = URIUtils.getPath(dataDirPath,
rawTableName, encodedSegmentName);
+ if
(dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME))
{
+ segmentDownloadURIStr = URIUtils.getPath(provider.getVip(),
"segments", rawTableName, encodedSegmentName);
+ } else {
+ segmentDownloadURIStr = finalSegmentLocationPath;
+ }
+ finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
+ }
+ SegmentUploadMetadata segmentUploadMetadata = new
SegmentUploadMetadata(segmentDownloadURIStr,
+ sourceDownloadURIStr, finalSegmentLocationURI, segmentSizeInBytes,
segmentMetadata, encryptionInfo);
+ segmentUploadMetadataList.add(segmentUploadMetadata);
+ LOGGER.info("Using segment download URI: {} for segment: {} of table:
{} (move segment: {})",
+ segmentDownloadURIStr, segmentFile, tableNameWithType,
copySegmentToFinalLocation);
+ } catch (Exception ex) {
+ cleanupTempFiles(tempEncryptedFiles, tempDecryptedFiles,
tempSegmentDirs);
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR,
+ segmentUploadMetadataList.size());
+ throw new ControllerApplicationException(LOGGER, "Exception while
processing segments to upload: "
+ + ex.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, ex);
+ }
+ }
+
+ try {
+ ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager,
_controllerConf, _controllerMetrics);
+ zkOperator.completeSegmentsOperations(tableNameWithType, uploadType,
enableParallelPushProtection, allowRefresh,
+ headers, segmentUploadMetadataList);
+ return new SuccessResponse("Successfully uploaded segments: " +
segmentNames + " of table: "
+ + tableNameWithType);
+ } catch (Exception ex) {
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR,
+ segmentUploadMetadataList.size());
+ throw new ControllerApplicationException(LOGGER, "Exception while
uploading segments: " + ex.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, ex);
+ } finally {
+ cleanupTempFiles(tempEncryptedFiles, tempDecryptedFiles,
tempSegmentDirs);
Review Comment:
why not combine the two try-catch blocks into one, and share this `finally {
cleanupTempFiles(); } `
btw, would it possible to process all segments inside one temp dir, then we
just need to clean this root temp dir up in the end, and no need to track all
temp dirs used by segments separately
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -295,13 +302,18 @@ private SuccessResponse uploadSegment(@Nullable String
tableName, TableType tabl
extractHttpHeader(headers,
FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
copySegmentToFinalLocation =
Boolean.parseBoolean(copySegmentToDeepStore);
createSegmentFileFromMultipart(multiPart, destFile);
+ PinotFS pinotFS = null;
Review Comment:
good catch, may consider to cut a small PR to land this fix first.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java:
##########
@@ -555,6 +750,67 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart
multiPart,
}
}
+ @POST
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ @Path("/segmentList")
Review Comment:
maybe consider `v3/segments` as there is `v2/segments`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]