Jackie-Jiang commented on a change in pull request #5967: URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496966149
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java ########## @@ -460,6 +465,40 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart, } } + @POST + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/segments/metadata") + @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a segment using segment metadata") + public void uploadSegmentMetadataAsJson(String segmentJsonStr, + @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, + @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, + @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { + try { + asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, false)); + } catch (Throwable t) { + asyncResponse.resume(t); + } + } + + @POST + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Path("/segments/metadata") + @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a segment using segment metadata") + public void uploadSegmentMetadataAsMultiPart(FormDataMultiPart multiPart, + @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, + @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, + @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { + try { + asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, false)); + } catch (Throwable t) { + asyncResponse.resume(t); + } + } + Review comment: I don't think we need to add these 2 new APIs, we can still use the existing segment upload API but with metadata upload in the header. I feel it is better to use the same path for segment upload. ########## File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java ########## @@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s } } } + + public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap) Review comment: Add some javadoc? ########## File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java ########## @@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s } } } + + public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap) + throws Exception { + String tableName = spec.getTableSpec().getTableName(); + LOGGER.info("Start pushing segment metadata: {}... to locations: {} for table {}", + segmentUriToTarPathMap, + Arrays.toString(spec.getPinotClusterSpecs()), tableName); + for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { + String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath); + String fileName = new File(tarFilePath).getName(); + Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); + String segmentName = fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()); + File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + try { + for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { + URI controllerURI; + try { + controllerURI = new URI(pinotClusterSpec.getControllerURI()); + } catch (URISyntaxException e) { + throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'"); + } + LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName); + int attempts = 1; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) { + attempts = spec.getPushJobSpec().getPushAttempts(); + } + long retryWaitMs = 1000L; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) { + retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis(); + } + RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { + try { + List<Header> headers = ImmutableList.of( + new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath), + new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.URI.toString())); Review comment: (Critical) Shouldn't this be `METADATA`? ########## File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java ########## @@ -177,4 +188,127 @@ public static void sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s } } } + + public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap) + throws Exception { + String tableName = spec.getTableSpec().getTableName(); + LOGGER.info("Start pushing segment metadata: {}... to locations: {} for table {}", Review comment: (nit) Why having the `...` here? We are not omitting any segment here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org