mcvsubbu commented on a change in pull request #3877: Pinot controller side change to enhance LLC segment metadata upload. URL: https://github.com/apache/incubator-pinot/pull/3877#discussion_r263956908
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java ########## @@ -276,10 +317,182 @@ public String segmentUpload(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE return response; } + @POST + @Path(SegmentCompletionProtocol.MSG_TYPE_COMMIT_END_METADATA) + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol.PARAM_INSTANCE_ID) String instanceId, + @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_NAME) String segmentName, + @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION) String segmentLocation, + @QueryParam(SegmentCompletionProtocol.PARAM_OFFSET) long offset, + @QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes, + @QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis, + @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis, + @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows, + @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes, + FormDataMultiPart metadataFiles) { + if (instanceId == null || segmentName == null || offset == -1 || segmentLocation == null || metadataFiles == null) { + LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, segmentLocation={}", offset, segmentName, + instanceId, segmentLocation); + // TODO: memoryUsedInBytes = 0 if not present in params. Add validation when we start using it + return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); + } + + SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params(); + requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset) + .withSegmentLocation(segmentLocation).withSegmentSizeBytes(segmentSizeBytes) + .withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows) + .withMemoryUsedBytes(memoryUsedBytes); + LOGGER.info("Processing segmentCommitEnd:{}", requestParams.toString()); + + + final boolean isSuccess = true; + final boolean isSplitCommit = true; + SegmentMetadataImpl segmentMetadata = extractMetadataFromInput(metadataFiles, segmentName); + // If it fails to extract metadata from the input form, return failure. + if (segmentMetadata == null) { + LOGGER.warn("Segment metadata extraction failure for segment {}", segmentName); + return SegmentCompletionProtocol.RESP_FAILED.toJsonString(); + } + SegmentCompletionProtocol.Response response = + SegmentCompletionManager.getInstance().segmentCommitEnd(requestParams, isSuccess, isSplitCommit, + CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata)); + final String responseStr = response.toJsonString(); + LOGGER.info("Response to segmentCommitEnd:{}", responseStr); + return responseStr; + } + + /** + * Extract and return the segment metadata from the two input form data files (metadata file and creation meta). + * Return null if any of the two files is missing or there is exception during parsing and extraction. + * + */ + private SegmentMetadataImpl extractMetadataFromInput(FormDataMultiPart metadataFiles, String segmentNameStr) { + String tempMetadataDirStr = StringUtil.join("/", _controllerConf.getLocalTempDir(), segmentNameStr + METADATA_TEMP_DIR_SUFFIX); + File tempMetadataDir = new File(tempMetadataDirStr); + + try { + Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr); + // Extract metadata.properties from the metadataFiles. + if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME)) { + return null; + } + // Extract creation.meta from the metadataFiles. + if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META)) { + return null; + } + // Load segment metadata + return new SegmentMetadataImpl(tempMetadataDir); + } catch (Exception e) { + LOGGER.error("Exception extracting and reading segment metadata for {}", segmentNameStr, e); + return null; + } finally { + FileUtils.deleteQuietly(tempMetadataDir); + } + } + + /** + * + * Extract a single file with name metaFileName from the input FormDataMultiPart and put it under the path + * tempMetadataDirStr + metaFileName. + * Return true iff the extraction and copy is successful. + */ + private boolean extractMetadataFromInputField(FormDataMultiPart metadataFiles, String tempMetadataDirStr, + String metaFileName) { + FormDataBodyPart metadataFilesField = metadataFiles.getField(metaFileName); + Preconditions.checkNotNull(metadataFilesField, "The metadata input field %s does not exist.", + metaFileName); + + try (InputStream metadataPropertiesInputStream = metadataFilesField.getValueAs(InputStream.class)) { + Preconditions.checkNotNull(metadataPropertiesInputStream, "Unable to parse %s from input.", + metaFileName); + java.nio.file.Path metadataPropertiesPath = + FileSystems.getDefault().getPath(tempMetadataDirStr, metaFileName); + Files.copy(metadataPropertiesInputStream, metadataPropertiesPath); + return true; + } catch (IOException e) { + LOGGER.error("Failed to copy metadata property file: {}", metaFileName, e); + } + return false; + } + + /** + * Extract the segment metadata files from the tar-zipped segment file + * <p>Segment tar-zipped file path: segmentLocation URI which can be either local or remote storage. + * <p>We extract the metadata.properties and creation.meta into a temporary metadata directory: + * DATADIR/rawTableName/segmentName.metadata.tmp, and load metadata from there. + * + * @param segmentNameStr Name of the segment + * @param segmentLocation the location of the segment file which could be local or in deep storage. + * @return SegmentMetadataImpl if it is able to extract the metadata file from the tar-zipped segment file. + */ + private SegmentMetadataImpl extractMetadataFromSegmentFile(final String segmentNameStr, + final URI segmentLocation) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), segmentName.getTableName()); + String tempSegmentDataDirStr = StringUtil.join("/", baseDirStr, segmentNameStr + + SEGMENT_TMP_DIR + String.valueOf(System.currentTimeMillis())); + File tempSegmentDataDir = new File(tempSegmentDataDirStr); + File segDstFile = new File(StringUtil.join("/", tempSegmentDataDirStr, segmentNameStr)); + // Use PinotFS to copy the segment file to local fs for metadata extraction. + PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(_controllerConf.getDataDir()).getScheme()); + try { + Preconditions.checkState(tempSegmentDataDir.mkdirs(), "Failed to create directory: %s", tempSegmentDataDir); + pinotFS.copyToLocalFile(segmentLocation, segDstFile); + return getSegmentMetadataFromLocalFile(segmentName, segDstFile); + } catch (Exception e) { + LOGGER.error("Exception copy segment file to local {}", segmentNameStr, e); + return null; + } finally { + FileUtils.deleteQuietly(tempSegmentDataDir); + } + } + + private SegmentMetadataImpl getSegmentMetadataFromLocalFile(LLCSegmentName segmentName, File segmentFile) { + String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), segmentName.getTableName()); + String tempMetadataDirStr = StringUtil.join("/", baseDirStr, + segmentName.getSegmentName() + METADATA_TEMP_DIR_SUFFIX + String.valueOf(System.currentTimeMillis())); + File tempMetadataDir = new File(tempMetadataDirStr); + try (// Extract metadata.properties + InputStream metadataPropertiesInputStream = TarGzCompressionUtils + .unTarOneFile(new FileInputStream(segmentFile), V1Constants.MetadataKeys.METADATA_FILE_NAME); + // Extract creation.meta + InputStream creationMetaInputStream = TarGzCompressionUtils + .unTarOneFile(new FileInputStream(segmentFile), V1Constants.SEGMENT_CREATION_META) + ) { + Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr); + Preconditions.checkNotNull(metadataPropertiesInputStream, "%s does not exist", + V1Constants.MetadataKeys.METADATA_FILE_NAME); + java.nio.file.Path metadataPropertiesPath = + FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME); + Files.copy(metadataPropertiesInputStream, metadataPropertiesPath); + + Preconditions.checkNotNull(creationMetaInputStream, "%s does not exist", V1Constants.SEGMENT_CREATION_META); + java.nio.file.Path creationMetaPath = FileSystems.getDefault().getPath(tempMetadataDirStr, V1Constants.SEGMENT_CREATION_META); + Files.copy(creationMetaInputStream, creationMetaPath); + // Load segment metadata + return new SegmentMetadataImpl(tempMetadataDir); + } catch (Exception e) { + LOGGER.error("Exception extracting and reading segment metadata for {}", segmentName.getSegmentName(), e); + return null; + } finally { + FileUtils.deleteQuietly(tempMetadataDir); + } + } + + /** + * + * @param multiPart input uploaded files. + * @param instanceId + * @param segmentName + * @param isSplitCommit + * @return (1) the URI for the uploaded segment file and; (2) for a non-split commit, the corresponding segment metadata. + */ @Nullable - private String uploadSegment(FormDataMultiPart multiPart, String instanceId, String segmentName, + private ImmutablePair<URI, SegmentMetadataImpl> uploadSegment(FormDataMultiPart multiPart, String instanceId, String segmentName, Review comment: OK, this method is getting to be too confusing now, with the split-commit argument. Perhaps the way it was made sense when we had local storage only. Please remove this method. Instead, let us do: segmentCommit() { uploadFileToLocalTmpFile() ExtractMetadataFromLocalFile() StoreLocalFileToPinotFS (be sure to copy the comments) } segmentUpload() { uploadFileToLocalStore() StoreLocalFileToPinotFS() } I tried a local re-factor and that can work, and we can get rid of this method and multiple checks for splitCommit inside that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org