mcvsubbu commented on a change in pull request #3877: Pinot controller side 
change to enhance LLC segment metadata upload.
URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263956908
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
 ##########
 @@ -276,10 +317,182 @@ public String 
segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE
     return response;
   }
 
+  @POST
+  @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA)
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  public String 
segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID)
 String instanceId,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String 
segmentLocation,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long 
memoryUsedBytes,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long 
buildTimeMillis,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long 
waitTimeMillis,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
+                                             
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long 
segmentSizeBytes,
+                                             FormDataMultiPart metadataFiles) {
+    if (instanceId == null || segmentName == null || offset == -1 || 
segmentLocation == null || metadataFiles == null) {
+      LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, 
segmentLocation={}", offset, segmentName,
+              instanceId, segmentLocation);
+      // TODO: memoryUsedInBytes = 0 if not present in params. Add validation 
when we start using it
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+
+    SegmentCompletionProtocol.Request.Params requestParams = new 
SegmentCompletionProtocol.Request.Params();
+    
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset)
+            
.withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes)
+            
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
+            .withMemoryUsedBytes(memoryUsedBytes);
+    LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString());
+
+
+    final boolean isSuccess = true;
+    final boolean isSplitCommit = true;
+    SegmentMetadataImpl segmentMetadata = 
extractMetadataFromInput(metadataFiles, segmentName);
+    // If it fails to extract metadata from the input form, return failure.
+    if (segmentMetadata == null) {
+      LOGGER.warn("Segment metadata extraction failure for segment {}", 
segmentName);
+      return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
+    }
+    SegmentCompletionProtocol.Response response =
+            
SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, 
isSuccess, isSplitCommit,
+                    
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams,
 segmentMetadata));
+    final String responseStr = response.toJsonString();
+    LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
+    return responseStr;
+  }
+
+  /**
+   * Extract and return the segment metadata from the two input form data 
files (metadata file and creation meta).
+   * Return null if any of the two files is missing or there is exception 
during parsing and extraction.
+   *
+   */
+  private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart 
metadataFiles, String segmentNameStr) {
+    String tempMetadataDirStr = StringUtil.join("/", 
_controllerConf.getLocalTempDir(), segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+    File tempMetadataDir = new File(tempMetadataDirStr);
+
+    try {
+      Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create 
directory: %s", tempMetadataDirStr);
+      // Extract metadata.properties from the metadataFiles.
+      if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, 
V1Constants.MetadataKeys.METADATA_FILE_NAME)) {
+        return null;
+      }
+      // Extract creation.meta from the metadataFiles.
+      if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, 
V1Constants.SEGMENT_CREATION_META)) {
+        return null;
+      }
+      // Load segment metadata
+      return new SegmentMetadataImpl(tempMetadataDir);
+    } catch (Exception e) {
+      LOGGER.error("Exception extracting and reading segment metadata for {}", 
segmentNameStr, e);
+      return null;
+    } finally {
+      FileUtils.deleteQuietly(tempMetadataDir);
+    }
+  }
+
+  /**
+   *
+   * Extract a single file with name metaFileName from the input 
FormDataMultiPart and put it under the path
+   * tempMetadataDirStr + metaFileName.
+   * Return true iff the extraction and copy is successful.
+   */
+  private boolean extractMetadataFromInputField(FormDataMultiPart 
metadataFiles, String tempMetadataDirStr,
+                                                String metaFileName) {
+    FormDataBodyPart metadataFilesField = metadataFiles.getField(metaFileName);
+    Preconditions.checkNotNull(metadataFilesField, "The metadata input field 
%s does not exist.",
+            metaFileName);
+
+    try (InputStream metadataPropertiesInputStream = 
metadataFilesField.getValueAs(InputStream.class)) {
+      Preconditions.checkNotNull(metadataPropertiesInputStream, "Unable to 
parse %s from input.",
+              metaFileName);
+      java.nio.file.Path metadataPropertiesPath =
+              FileSystems.getDefault().getPath(tempMetadataDirStr, 
metaFileName);
+      Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to copy metadata property file: {}", metaFileName, 
e);
+    }
+    return false;
+  }
+
+  /**
+   * Extract the segment metadata files from the tar-zipped segment file
+   * <p>Segment tar-zipped file path: segmentLocation URI which can be either 
local or remote storage.
+   * <p>We extract the metadata.properties and creation.meta into a temporary 
metadata directory:
+   * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from 
there.
+   *
+   * @param segmentNameStr Name of the segment
+   * @param segmentLocation the location of the segment file which could be 
local or in deep storage.
+   * @return SegmentMetadataImpl if it is able to extract the metadata file 
from the tar-zipped segment file.
+   */
+  private SegmentMetadataImpl extractMetadataFromSegmentFile(final String 
segmentNameStr,
+                                                             final URI 
segmentLocation) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), 
segmentName.getTableName());
+    String tempSegmentDataDirStr = StringUtil.join("/", baseDirStr, 
segmentNameStr +
+            SEGMENT_TMP_DIR + String.valueOf(System.currentTimeMillis()));
+    File tempSegmentDataDir = new File(tempSegmentDataDirStr);
+    File segDstFile = new File(StringUtil.join("/", tempSegmentDataDirStr, 
segmentNameStr));
+    // Use PinotFS to copy the segment file to local fs for metadata 
extraction.
+    PinotFS pinotFS = 
PinotFSFactory.create(ControllerConf.getUriFromPath(_controllerConf.getDataDir()).getScheme());
+    try {
+      Preconditions.checkState(tempSegmentDataDir.mkdirs(), "Failed to create 
directory: %s", tempSegmentDataDir);
+      pinotFS.copyToLocalFile(segmentLocation, segDstFile);
+      return getSegmentMetadataFromLocalFile(segmentName, segDstFile);
+    } catch (Exception e) {
+      LOGGER.error("Exception copy segment file to local {}",  segmentNameStr, 
e);
+      return null;
+    } finally {
+      FileUtils.deleteQuietly(tempSegmentDataDir);
+    }
+  }
+
+  private SegmentMetadataImpl getSegmentMetadataFromLocalFile(LLCSegmentName 
segmentName, File segmentFile) {
+    String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), 
segmentName.getTableName());
+    String tempMetadataDirStr = StringUtil.join("/", baseDirStr,
+            segmentName.getSegmentName() + METADATA_TEMP_DIR_SUFFIX + 
String.valueOf(System.currentTimeMillis()));
+    File tempMetadataDir = new File(tempMetadataDirStr);
+    try (// Extract metadata.properties
+         InputStream metadataPropertiesInputStream = TarGzCompressionUtils
+                 .unTarOneFile(new FileInputStream(segmentFile), 
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+         // Extract creation.meta
+         InputStream creationMetaInputStream = TarGzCompressionUtils
+                 .unTarOneFile(new FileInputStream(segmentFile), 
V1Constants.SEGMENT_CREATION_META)
+         ) {
+      Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create 
directory: %s", tempMetadataDirStr);
+      Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not 
exist",
+              V1Constants.MetadataKeys.METADATA_FILE_NAME);
+      java.nio.file.Path metadataPropertiesPath =
+              FileSystems.getDefault().getPath(tempMetadataDirStr, 
V1Constants.MetadataKeys.METADATA_FILE_NAME);
+      Files.copy(metadataPropertiesInputStream, metadataPropertiesPath);
+
+      Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist", 
V1Constants.SEGMENT_CREATION_META);
+      java.nio.file.Path creationMetaPath = 
FileSystems.getDefault().getPath(tempMetadataDirStr, 
V1Constants.SEGMENT_CREATION_META);
+      Files.copy(creationMetaInputStream, creationMetaPath);
+      // Load segment metadata
+      return new SegmentMetadataImpl(tempMetadataDir);
+    } catch (Exception e) {
+      LOGGER.error("Exception extracting and reading segment metadata for {}", 
segmentName.getSegmentName(), e);
+      return null;
+    } finally {
+      FileUtils.deleteQuietly(tempMetadataDir);
+    }
+  }
+
+  /**
+   *
+   * @param multiPart input uploaded files.
+   * @param instanceId
+   * @param segmentName
+   * @param isSplitCommit
+   * @return (1) the URI for the uploaded segment file and; (2) for a 
non-split commit, the corresponding segment metadata.
+   */
   @Nullable
-  private String uploadSegment(FormDataMultiPart multiPart, String instanceId, 
String segmentName,
+  private ImmutablePair<URI, SegmentMetadataImpl> 
uploadSegment(FormDataMultiPart multiPart, String instanceId, String 
segmentName,
 
 Review comment:
   OK, this method is getting to be too confusing now, with the split-commit 
argument. Perhaps the way it was made sense when we had local storage only. 
   Please remove this method. Instead, let us do:
   segmentCommit() {
     uploadFileToLocalTmpFile()
     ExtractMetadataFromLocalFile()
     StoreLocalFileToPinotFS (be sure to copy the comments)
   }
   segmentUpload() {
     uploadFileToLocalStore()
     StoreLocalFileToPinotFS()
   }
   I tried a local re-factor and that can work, and we can get rid of this 
method and multiple checks for splitCommit inside that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to