swaminathanmanish commented on code in PR #14139:
URL: https://github.com/apache/pinot/pull/14139#discussion_r1793243424


##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -106,28 +192,123 @@ public void run() {
     } else {
       JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
       JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
-      URI finalOutputDirURI = outputDirURI;
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentTarPath)
-            throws Exception {
-          PluginManager.get().init();
-          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-            PinotFSFactory
-                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
+
+      if (_spec.getPushJobSpec().isBatchSegmentUpload()) {
+        // Process segments in batch mode using foreachPartition
+        pathRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
+          @Override
+          public void call(Iterator<String> segmentIterator) throws Exception {
+            PluginManager.get().init();
+            setupFileSystems();
+
+            List<String> segmentsInPartition = new ArrayList<>();
+            segmentIterator.forEachRemaining(segmentsInPartition::add);
+
+            try {
+              Map<String, String> segmentUriToTarPathMap =
+                  SegmentPushUtils.getSegmentUriToTarPathMap(outputDirURI, 
_spec.getPushJobSpec(),
+                      segmentsInPartition.toArray(new String[0]));
+              SegmentPushUtils.sendSegmentUriAndMetadata(_spec, 
PinotFSFactory.create(outputDirURI.getScheme()),
+                  segmentUriToTarPathMap);
+            } catch (RetriableOperationException | AttemptsExceededException 
e) {
+              throw new RuntimeException(e);
+            }
           }
-          try {
-            Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
-                .getSegmentUriToTarPathMap(finalOutputDirURI, 
_spec.getPushJobSpec(), new String[]{segmentTarPath});
-            SegmentPushUtils.sendSegmentUriAndMetadata(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
-                segmentUriToTarPathMap);
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
+        });
+      } else {
+        // Process segments one by one using foreach
+        pathRDD.foreach(new VoidFunction<String>() {
+          @Override
+          public void call(String segmentTarPath) throws Exception {

Review Comment:
   Do we have tests for this change? non-consistent push batch upload code path



##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -41,16 +50,45 @@
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.scheduler.JobFailed;
+import org.apache.spark.scheduler.JobResult;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
 
 
 public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, 
Serializable {
-  private SegmentGenerationJobSpec _spec;
+  // This listener is added to the SparkContext and is executed when the Spark 
job fails.
+  // It handles the failure by calling 
ConsistentDataPushUtils.handleUploadException.
+  // The listener is only added if consistent data push is enabled and the 
pushParallelism is greater than 1.
+  // This listener is not a required part of the implementation, as the start 
replace segments protocol
+  // will cleanup past failures as part of the fresh consistent data push, but 
it's still cleaner to handle
+  // the failure as soon as possible.
+  private static class ConsistentDataPushFailureHandler extends SparkListener {
+    private final SegmentGenerationJobSpec _spec;
+    private final Map<URI, String> _uriToLineageEntryIdMap;
 
-  public SparkSegmentMetadataPushJobRunner() {
+    public ConsistentDataPushFailureHandler(SegmentGenerationJobSpec spec, 
Map<URI, String> uriToLineageEntryIdMap) {

Review Comment:
   Do we have tests for this listener code? 



##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -106,28 +192,123 @@ public void run() {
     } else {
       JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
       JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
-      URI finalOutputDirURI = outputDirURI;
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentTarPath)
-            throws Exception {
-          PluginManager.get().init();
-          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-            PinotFSFactory
-                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
+
+      if (_spec.getPushJobSpec().isBatchSegmentUpload()) {

Review Comment:
   Clarification:
   Is there a reason why we have this if/else check? we basically call the same 
code whether batch/no-batch right? 
   _spec.getPushJobSpec().isBatchSegmentUpload
   
   foreachPartition applies this on the partition itself (i.e N segments per 
partition based on pushParallelism), whereas forEach goes through each element?
   



##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -59,43 +97,91 @@ public void init(SegmentGenerationJobSpec spec) {
   }
 
   @Override
-  public void run() {
-    //init all file systems
-    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
-    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-    }
+  public void run()
+      throws Exception {
+    // Initialize all file systems
+    setupFileSystems();
 
-    //Get outputFS for writing output pinot segments
-    URI outputDirURI;
-    try {
-      outputDirURI = new URI(_spec.getOutputDirURI());
-      if (outputDirURI.getScheme() == null) {
-        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-      }
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
-    }
+    // Get outputFS for writing output pinot segments
+    URI outputDirURI = validateOutputDirURI(_spec.getOutputDirURI());
     PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-    //Get list of files to process
-    String[] files;
-    try {
-      files = outputDirFS.listFiles(outputDirURI, true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
-    }
 
-    List<String> segmentsToPush = new ArrayList<>();
-    for (String file : files) {
-      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        segmentsToPush.add(file);
-      }
-    }
+    // Collect files to process
+    List<String> segmentsToPush = getSegmentsToPush(outputDirFS, outputDirURI);
+
+    // Ensure tableConfigURI is set if missing
+    setupTableConfigURI();
+
+    // Retrieve table config and check for consistent push
+    TableConfig tableConfig =
+        
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), 
_spec.getAuthToken());
+    boolean consistentPushEnabled = 
ConsistentDataPushUtils.consistentDataPushEnabled(tableConfig);
 
+    // Determine push parallelism
     int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
     if (pushParallelism < 1) {
       pushParallelism = segmentsToPush.size();
     }
+
+    // Call the appropriate push helper
+    if (consistentPushEnabled) {
+      handleConsistentPush(segmentsToPush, outputDirURI, pushParallelism);
+    } else {
+      handleNonConsistentPush(segmentsToPush, outputDirFS, outputDirURI, 
pushParallelism);
+    }
+  }
+
+  private void handleConsistentPush(List<String> segmentsToPush, URI 
outputDirURI, int pushParallelism)
+      throws Exception {
+    Map<String, String> segmentsToPushUriToTarPathMap =
+        SegmentPushUtils.getSegmentUriToTarPathMap(outputDirURI, 
_spec.getPushJobSpec(),
+            segmentsToPush.toArray(new String[0]));
+    Map<URI, String> uriToLineageEntryIdMap =
+        ConsistentDataPushUtils.preUpload(_spec, 
getSegmentsToReplace(segmentsToPushUriToTarPathMap));
+
+    if (pushParallelism == 1) {
+      // Single push
+      SegmentPushUtils.sendSegmentUriAndMetadata(_spec, 
PinotFSFactory.create(outputDirURI.getScheme()),
+          segmentsToPushUriToTarPathMap);
+    } else {
+      // Parallel push using Spark
+      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+      sparkContext.sc().addSparkListener(new 
ConsistentDataPushFailureHandler(_spec, uriToLineageEntryIdMap));
+      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
+
+      if (_spec.getPushJobSpec().isBatchSegmentUpload()) {
+        // Process segments in batch mode using foreachPartition
+        pathRDD.foreachPartition(segmentIterator -> {
+          setupFileSystems();

Review Comment:
   Does setupFileSystems need to happen for every iteration or can it be 
outside? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to