mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r264895287
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##########
 @@ -254,31 +311,212 @@ public String 
segmentCommit(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.MULTIPART_FORM_DATA)
   public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String 
instanceId,
-      @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String 
segmentName,
-      @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
+                              
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+                              
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, 
FormDataMultiPart multiPart) {
     SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
     
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset);
     LOGGER.info("Processing segmentUpload:{}", requestParams.toString());
 
-    final String segmentLocation = uploadSegment(multiPart, instanceId, 
segmentName, true);
-    if (segmentLocation == null) {
+    // Get the segment from the form input and put it in the right place.
+    File localTmpFile = uploadFileToLocalTmpFile(multiPart, instanceId, 
segmentName);
+    if (localTmpFile == null) {
+      LOGGER.error("Unable to get the segment file from multipart input to 
local file {}", segmentName);
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+    try {
+      FileUploadPathProvider provider = new 
FileUploadPathProvider(_controllerConf);
+      URI uri = localSegementFileToPinotFsTmpLocation(provider, localTmpFile, 
segmentName);
+      if (uri == null) {
+        LOGGER.error("Unable to upload local segment file {} to Pinot 
storage", localTmpFile.toPath());
+        return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+      }
+      SegmentCompletionProtocol.Response.Params responseParams =
+              new 
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
+                      .withSegmentLocation(uri.toString())
+                      
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+
+      String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+
+      LOGGER.info("Response to segmentUpload:{}", response);
+
+      return response;
+    } catch (Exception e) {
+
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    } finally {
+      FileUtils.deleteQuietly(localTmpFile);
+    }
+  }
+
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+                                             FormDataMultiPart metadataFiles) {
+    if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+              instanceId, segmentLocation);
+      // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+            
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+            
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+            .withMemoryUsedBytes(memoryUsedBytes);
+    LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+
+    final boolean isSuccess = true;
+    final boolean isSplitCommit = true;
+    SegmentMetadataImpl segmentMetadata = 
extractMetadataFromInput(metadataFiles, segmentName);
+    // If it fails to extract metadata from the input form, return failure.
+    if (segmentMetadata == null) {
+      LOGGER.error("Segment metadata extraction failure for segment {}", 
segmentName);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
-    SegmentCompletionProtocol.Response.Params responseParams =
-        new 
SegmentCompletionProtocol.Response.Params().withOffset(requestParams.getOffset())
-            .withSegmentLocation(segmentLocation)
-            
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS);
+    SegmentCompletionProtocol.Response response =
+            
SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, 
isSuccess, isSplitCommit,
+                    
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata));
+    final String responseStr = response.toJsonString();
+    LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+    return responseStr;
+  }
 
-    String response = new 
SegmentCompletionProtocol.Response(responseParams).toJsonString();
+  /**
+   * Extract and return the segment metadata from the two input form data 
files (metadata file and creation meta).
+   * Return null if any of the two files is missing or there is exception 
during parsing and extraction.
+   *
+   */
+  private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart 
metadataFiles, String segmentNameStr) {
+    String tempMetadataDirStr = StringUtil.join("/", 
_controllerConf.getLocalTempDir(),
+            segmentNameStr + METADATA_TEMP_DIR_SUFFIX + 
String.valueOf(System.currentTimeMillis()));
+    File tempMetadataDir = new File(tempMetadataDirStr);
+    try {
+      Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create 
directory: %s", tempMetadataDirStr);
+      // Extract metadata.properties from the metadataFiles.
+      if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, 
V1Constants.MetadataKeys.METADATA_FILE_NAME)) {
+        return null;
+      }
+      // Extract creation.meta from the metadataFiles.
+      if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, 
V1Constants.SEGMENT_CREATION_META)) {
+        return null;
+      }
+      // Load segment metadata
+      return new SegmentMetadataImpl(tempMetadataDir);
+    } catch (Exception e) {
+      LOGGER.error("Exception extracting and reading segment metadata for {}", 
segmentNameStr, e);
+      return null;
+    } finally {
+      FileUtils.deleteQuietly(tempMetadataDir);
+    }
+  }
 
-    LOGGER.info("Response to segmentUpload:{}", response);
+  /**
+   *
+   * Extract a single file with name metaFileName from the input 
FormDataMultiPart and put it under the path
+   * tempMetadataDirStr + metaFileName.
+   * Return true iff the extraction and copy is successful.
+   */
+  private boolean extractMetadataFromInputField(FormDataMultiPart 
metadataFiles, String tempMetadataDirStr,
+                                                String metaFileName) {
+    FormDataBodyPart metadataFilesField = metadataFiles.getField(metaFileName);
+    Preconditions.checkNotNull(metadataFilesField, "The metadata input field 
%s does not exist.",
+            metaFileName);
+
+    try (InputStream metadataPropertiesInputStream = 
metadataFilesField.getValueAs(InputStream.class)) {
+      Preconditions.checkNotNull(metadataPropertiesInputStream, "Unable to 
parse %s from input.",
+              metaFileName);
+      java.nio.file.Path metadataPropertiesPath =
+              FileSystems.getDefault().getPath(tempMetadataDirStr, 
metaFileName);
+      Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to copy metadata property file: {}", metaFileName, 
e);
 
 Review comment:
   Include segment name here. You may have to pass it as parameter, but worth 
it.
   Also, change 'copy' to 'extract'

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to