mcvsubbu commented on a change in pull request #3877: Pinot controller side
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264935456
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
##########
@@ -260,25 +313,203 @@ public String
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
- final String segmentLocation = uploadSegment(multiPart, instanceId,
segmentName, true);
- if (segmentLocation == null) {
+ // Get the segment from the form input and put it in the right place.
+ File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId,
segmentName);
+ if (localTmpFile == null) {
+ LOGGER.error("Unable to get the segment file from multipart input to
local file {}", segmentName);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Response.Params responseParams =
- new
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
- .withSegmentLocation(segmentLocation)
-
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+ try {
+ FileUploadPathProvider provider = new
FileUploadPathProvider(_controllerConf);
+ URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile,
segmentName);
+ if (uri == null) {
+ LOGGER.error("Unable to upload local segment file {} to Pinot storage
for segment ", localTmpFile.toPath(),
+ segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ SegmentCompletionProtocol.Response.Params responseParams =
+ new
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+ .withSegmentLocation(uri.toString())
+
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+ String response = new
SegmentCompletionProtocol.Response(responseParams).toJsonString();
- String response = new
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+ LOGGER.info("Response to segmentUpload:{}", response);
- LOGGER.info("Response to segmentUpload:{}", response);
+ return response;
+ } catch (Exception e) {
- return response;
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ } finally {
+ FileUtils.deleteQuietly(localTmpFile);
+ }
}
- @Nullable
- private String uploadSegment(FormDataMultiPart multiPart, String instanceId,
String segmentName,
- boolean isSplitCommit) {
+ @POST
+ @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public String
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
String instanceId,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String
segmentName,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String
segmentLocation,
+ @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+ @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long
memoryUsedBytes,
+ @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long
buildTimeMillis,
+ @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long
waitTimeMillis,
+ @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+ @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long
segmentSizeBytes,
+ FormDataMultiPart metadataFiles) {
+ if (instanceId == null || segmentName == null || offset == -1 ||
segmentLocation == null || metadataFiles == null) {
+ LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={},
segmentLocation={}", offset, segmentName,
+ instanceId, segmentLocation);
+ // TODO: memoryUsedInBytes = 0 if not present in params. Add validation
when we start using it
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+
+ SegmentCompletionProtocol.Request.Params requestParams = new
SegmentCompletionProtocol.Request.Params();
+
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+ .withMemoryUsedBytes(memoryUsedBytes);
+ LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+ final boolean isSuccess = true;
+ final boolean isSplitCommit = true;
+ SegmentMetadataImpl segmentMetadata =
extractMetadataFromInput(metadataFiles, segmentName);
+ // If it fails to extract metadata from the input form, return failure.
+ if (segmentMetadata == null) {
+ LOGGER.error("Segment metadata extraction failure for segment {}",
segmentName);
+ return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+ }
+ SegmentCompletionProtocol.Response response =
SegmentCompletionManager.getInstance()
+ .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
+
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
segmentMetadata));
+ final String responseStr = response.toJsonString();
+ LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+ return responseStr;
+ }
+
+ /**
+ * Extract and return the segment metadata from the two input form data
files (metadata file and creation meta).
+ * Return null if any of the two files is missing or there is exception
during parsing and extraction.
+ *
Review comment:
extra line
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]