Jackie-Jiang commented on a change in pull request #5967:
URL: https://github.com/apache/incubator-pinot/pull/5967#discussion_r496966149



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -460,6 +465,40 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart 
multiPart,
     }
   }
 
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Path("/segments/metadata")
+  @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a 
segment using segment metadata")
+  public void uploadSegmentMetadataAsJson(String segmentJsonStr,
+      @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String 
tableName,
+      @ApiParam(value = "Whether to enable parallel push protection") 
@DefaultValue("false") 
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
 boolean enableParallelPushProtection,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(uploadSegment(tableName, null, 
enableParallelPushProtection, headers, request, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+
+  @POST
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.MULTIPART_FORM_DATA)
+  @Path("/segments/metadata")
+  @ApiOperation(value = "Upload a segment with metadata", notes = "Upload a 
segment using segment metadata")
+  public void uploadSegmentMetadataAsMultiPart(FormDataMultiPart multiPart,
+      @ApiParam(value = "Name of the table") 
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String 
tableName,
+      @ApiParam(value = "Whether to enable parallel push protection") 
@DefaultValue("false") 
@QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION)
 boolean enableParallelPushProtection,
+      @Context HttpHeaders headers, @Context Request request, @Suspended final 
AsyncResponse asyncResponse) {
+    try {
+      asyncResponse.resume(uploadSegment(tableName, multiPart, 
enableParallelPushProtection, headers, request, false));
+    } catch (Throwable t) {
+      asyncResponse.resume(t);
+    }
+  }
+

Review comment:
       I don't think we need to add these 2 new APIs, we can still use the 
existing segment upload API but with metadata upload in the header. I feel it 
is better to use the same path for segment upload. 

##########
File path: 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void 
sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)

Review comment:
       Add some javadoc?

##########
File path: 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void 
sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segment metadata: {}... to locations: {} for 
table {}",
+        segmentUriToTarPathMap,
+        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+      String fileName = new File(tarFilePath).getName();
+      
Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT));
+      String segmentName = fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length());
+      File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      try {
+        for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
+          URI controllerURI;
+          try {
+            controllerURI = new URI(pinotClusterSpec.getControllerURI());
+          } catch (URISyntaxException e) {
+            throw new RuntimeException("Got invalid controller uri - '" + 
pinotClusterSpec.getControllerURI() + "'");
+          }
+          LOGGER.info("Pushing segment: {} to location: {} for table {}", 
segmentName, controllerURI, tableName);
+          int attempts = 1;
+          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushAttempts() > 0) {
+            attempts = spec.getPushJobSpec().getPushAttempts();
+          }
+          long retryWaitMs = 1000L;
+          if (spec.getPushJobSpec() != null && 
spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) {
+            retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis();
+          }
+          RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
+            try {
+              List<Header> headers = ImmutableList.of(
+                  new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, 
segmentUriPath),
+                  new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, 
FileUploadDownloadClient.FileUploadType.URI.toString()));

Review comment:
       (Critical) Shouldn't this be `METADATA`?

##########
File path: 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
##########
@@ -177,4 +188,127 @@ public static void 
sendSegmentUris(SegmentGenerationJobSpec spec, List<String> s
       }
     }
   }
+
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+      throws Exception {
+    String tableName = spec.getTableSpec().getTableName();
+    LOGGER.info("Start pushing segment metadata: {}... to locations: {} for 
table {}",

Review comment:
       (nit) Why having the `...` here? We are not omitting any segment here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to