This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fda93ab083 Spark batch ingestion common code abstraction. (#14415)
fda93ab083 is described below

commit fda93ab0831d495c4dd8ea02a54eadc3d7ed6a53
Author: Abhishek Sharma <[email protected]>
AuthorDate: Sat Dec 14 19:11:19 2024 -0500

    Spark batch ingestion common code abstraction. (#14415)
---
 .../pinot-batch-ingestion-spark-2.4/pom.xml        |   2 +-
 .../spark/SparkSegmentMetadataPushJobRunner.java   | 112 +++++----------------
 .../batch/spark/SparkSegmentTarPushJobRunner.java  | 111 ++++++--------------
 .../batch/spark/SparkSegmentUriPushJobRunner.java  | 111 +++++---------------
 .../pinot-batch-ingestion-spark-3/pom.xml          |   2 +-
 .../spark3/SparkSegmentMetadataPushJobRunner.java  |   1 -
 .../batch/spark3/SparkSegmentTarPushJobRunner.java | 111 +++++---------------
 .../batch/spark3/SparkSegmentUriPushJobRunner.java | 111 +++++---------------
 .../{ => pinot-batch-ingestion-spark-base}/pom.xml |  35 ++++---
 .../BaseSparkSegmentMetadataPushJobRunner.java}    |  50 ++++-----
 .../common/BaseSparkSegmentTarPushJobRunner.java}  |  49 +++------
 .../common/BaseSparkSegmentUriPushJobRunner.java}  |  52 ++++------
 pinot-plugins/pinot-batch-ingestion/pom.xml        |   1 +
 pom.xml                                            |   5 +
 14 files changed, 205 insertions(+), 548 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml
index 9eb28d2f49..8b04760514 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml
@@ -38,7 +38,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-batch-ingestion-common</artifactId>
+      <artifactId>pinot-batch-ingestion-spark-base</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
index f4588cd1cc..fed7f7a150 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
@@ -18,20 +18,13 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import 
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentMetadataPushJobRunner;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -42,9 +35,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.VoidFunction;
 
-
-public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, 
Serializable {
-  private SegmentGenerationJobSpec _spec;
+public class SparkSegmentMetadataPushJobRunner extends 
BaseSparkSegmentMetadataPushJobRunner {
 
   public SparkSegmentMetadataPushJobRunner() {
   }
@@ -54,80 +45,31 @@ public class SparkSegmentMetadataPushJobRunner implements 
IngestionJobRunner, Se
   }
 
   @Override
-  public void init(SegmentGenerationJobSpec spec) {
-    _spec = spec;
-  }
-
-  @Override
-  public void run() {
-    //init all file systems
-    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
-    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-    }
-
-    //Get outputFS for writing output pinot segments
-    URI outputDirURI;
-    try {
-      outputDirURI = new URI(_spec.getOutputDirURI());
-      if (outputDirURI.getScheme() == null) {
-        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-      }
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
-    }
-    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-    //Get list of files to process
-    String[] files;
-    try {
-      files = outputDirFS.listFiles(outputDirURI, true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
-    }
-
-    List<String> segmentsToPush = new ArrayList<>();
-    for (String file : files) {
-      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        segmentsToPush.add(file);
-      }
-    }
-
-    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
-    if (pushParallelism < 1) {
-      pushParallelism = segmentsToPush.size();
-    }
-    if (pushParallelism == 1) {
-      // Push from driver
-      try {
-        SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
-      } catch (RetriableOperationException | AttemptsExceededException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
-      URI finalOutputDirURI = outputDirURI;
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentTarPath)
-            throws Exception {
-          PluginManager.get().init();
-          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-            PinotFSFactory
-                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
-          }
-          try {
-            Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
-                .getSegmentUriToTarPathMap(finalOutputDirURI, 
_spec.getPushJobSpec(), new String[]{segmentTarPath});
-            SegmentPushUtils.sendSegmentUriAndMetadata(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
-                segmentUriToTarPathMap);
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
-          }
+  public void parallelizeMetadataPushJob(List<String> segmentsToPush, 
List<PinotFSSpec> pinotFSSpecs,
+      int pushParallelism, URI outputDirURI) {
+    JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
+    URI finalOutputDirURI = outputDirURI;
+    // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
+    // instead.
+    pathRDD.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String segmentTarPath)
+          throws Exception {
+        PluginManager.get().init();
+        for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+          PinotFSFactory
+              .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
+        }
+        try {
+          Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+              .getSegmentUriToTarPathMap(finalOutputDirURI, 
_spec.getPushJobSpec(), new String[]{segmentTarPath});
+          SegmentPushUtils.sendSegmentUriAndMetadata(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
+              segmentUriToTarPathMap);
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
         }
-      });
-    }
+      }
+    });
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
index eadd389f7f..babe30e769 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
@@ -18,20 +18,13 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import 
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentTarPushJobRunner;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -43,89 +36,41 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.VoidFunction;
 
 
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner, 
Serializable {
+
+public class SparkSegmentTarPushJobRunner extends 
BaseSparkSegmentTarPushJobRunner {
   private SegmentGenerationJobSpec _spec;
 
   public SparkSegmentTarPushJobRunner() {
+    super();
   }
 
   public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
-    init(spec);
-  }
-
-  @Override
-  public void init(SegmentGenerationJobSpec spec) {
-    _spec = spec;
+    super(spec);
   }
 
-  @Override
-  public void run() {
-    //init all file systems
-    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
-    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-    }
-
-    //Get outputFS for writing output pinot segments
-    URI outputDirURI;
-    try {
-      outputDirURI = new URI(_spec.getOutputDirURI());
-      if (outputDirURI.getScheme() == null) {
-        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-      }
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
-    }
-    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-    //Get list of files to process
-    String[] files;
-    try {
-      files = outputDirFS.listFiles(outputDirURI, true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
-    }
-
-    List<String> segmentsToPush = new ArrayList<>();
-    for (String file : files) {
-      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        segmentsToPush.add(file);
-      }
-    }
-
-    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
-    if (pushParallelism < 1) {
-      pushParallelism = segmentsToPush.size();
-    }
-    if (pushParallelism == 1) {
-      // Push from driver
-      try {
-        SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
-      } catch (RetriableOperationException | AttemptsExceededException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
-      URI finalOutputDirURI = outputDirURI;
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentTarPath)
-            throws Exception {
-          PluginManager.get().init();
-          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-            PinotFSFactory
-                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
-          }
-          try {
-            SegmentPushUtils.pushSegments(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
-                Arrays.asList(segmentTarPath));
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
-          }
+  public void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
+      List<String> segmentUris, int pushParallelism, URI outputDirURI) {
+    JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, 
pushParallelism);
+    URI finalOutputDirURI = outputDirURI;
+    // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
+    // instead.
+    pathRDD.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String segmentTarPath)
+          throws Exception {
+        PluginManager.get().init();
+        for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+          PinotFSFactory
+              .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
         }
-      });
-    }
+        try {
+          SegmentPushUtils.pushSegments(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
+              Arrays.asList(segmentTarPath));
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
index e5080c4748..b809c3bcba 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
@@ -18,20 +18,12 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import 
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentUriPushJobRunner;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -43,94 +35,37 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.VoidFunction;
 
 
-public class SparkSegmentUriPushJobRunner implements IngestionJobRunner, 
Serializable {
-  private SegmentGenerationJobSpec _spec;
+public class SparkSegmentUriPushJobRunner extends 
BaseSparkSegmentUriPushJobRunner {
 
   public SparkSegmentUriPushJobRunner() {
+    super();
   }
 
   public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
-    init(spec);
+    super(spec);
   }
 
   @Override
-  public void init(SegmentGenerationJobSpec spec) {
-    _spec = spec;
-    if (_spec.getPushJobSpec() == null) {
-      throw new RuntimeException("Missing PushJobSpec");
-    }
-  }
-
-  @Override
-  public void run() {
-    //init all file systems
-    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
-    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-    }
-
-    //Get outputFS for writing output Pinot segments
-    URI outputDirURI;
-    try {
-      outputDirURI = new URI(_spec.getOutputDirURI());
-      if (outputDirURI.getScheme() == null) {
-        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-      }
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
-    }
-    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-
-    //Get list of files to process
-    String[] files;
-    try {
-      files = outputDirFS.listFiles(outputDirURI, true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
-    }
-    List<String> segmentUris = new ArrayList<>();
-    for (String file : files) {
-      URI uri = URI.create(file);
-      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        URI updatedURI = SegmentPushUtils
-            .generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
-                _spec.getPushJobSpec().getSegmentUriSuffix());
-        segmentUris.add(updatedURI.toString());
-      }
-    }
-
-    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
-    if (pushParallelism < 1) {
-      pushParallelism = segmentUris.size();
-    }
-    if (pushParallelism == 1) {
-      // Push from driver
-      try {
-        SegmentPushUtils.sendSegmentUris(_spec, segmentUris);
-      } catch (RetriableOperationException | AttemptsExceededException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, 
pushParallelism);
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentUri)
-            throws Exception {
-          try {
-            PluginManager.get().init();
-            for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-              PinotFSFactory
-                  .register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-            }
-            SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
+  public void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs, 
List<String> segmentUris, int pushParallelism) {
+    JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, 
pushParallelism);
+    // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
+    // instead.
+    pathRDD.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String segmentUri)
+          throws Exception {
+        try {
+          PluginManager.get().init();
+          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+            PinotFSFactory
+                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
           }
+          SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
         }
-      });
-    }
+      }
+    });
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml
index 74c1dc278e..e43a1a5525 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml
@@ -38,7 +38,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-batch-ingestion-common</artifactId>
+      <artifactId>pinot-batch-ingestion-spark-base</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
index 4a4e4729e7..e2cb2df38a 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
@@ -55,7 +55,6 @@ import org.apache.spark.scheduler.JobResult;
 import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerJobEnd;
 
-
 public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, 
Serializable {
   // This listener is added to the SparkContext and is executed when the Spark 
job fails.
   // It handles the failure by calling 
ConsistentDataPushUtils.handleUploadException.
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
index 9d06532858..a8c51fd606 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
@@ -18,20 +18,13 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark3;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import 
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentTarPushJobRunner;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -42,90 +35,40 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.VoidFunction;
 
-
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner, 
Serializable {
+public class SparkSegmentTarPushJobRunner extends 
BaseSparkSegmentTarPushJobRunner {
   private SegmentGenerationJobSpec _spec;
 
   public SparkSegmentTarPushJobRunner() {
+    super();
   }
 
   public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
-    init(spec);
-  }
-
-  @Override
-  public void init(SegmentGenerationJobSpec spec) {
-    _spec = spec;
+    super(spec);
   }
 
-  @Override
-  public void run() {
-    //init all file systems
-    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
-    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-    }
-
-    //Get outputFS for writing output pinot segments
-    URI outputDirURI;
-    try {
-      outputDirURI = new URI(_spec.getOutputDirURI());
-      if (outputDirURI.getScheme() == null) {
-        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-      }
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
-    }
-    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-    //Get list of files to process
-    String[] files;
-    try {
-      files = outputDirFS.listFiles(outputDirURI, true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
-    }
-
-    List<String> segmentsToPush = new ArrayList<>();
-    for (String file : files) {
-      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        segmentsToPush.add(file);
-      }
-    }
-
-    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
-    if (pushParallelism < 1) {
-      pushParallelism = segmentsToPush.size();
-    }
-    if (pushParallelism == 1) {
-      // Push from driver
-      try {
-        SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
-      } catch (RetriableOperationException | AttemptsExceededException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
-      URI finalOutputDirURI = outputDirURI;
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentTarPath)
-            throws Exception {
-          PluginManager.get().init();
-          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-            PinotFSFactory
-                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
-          }
-          try {
-            SegmentPushUtils.pushSegments(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
-                Arrays.asList(segmentTarPath));
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
-          }
+  public void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
+      List<String> segmentUris, int pushParallelism, URI outputDirURI) {
+    JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, 
pushParallelism);
+    URI finalOutputDirURI = outputDirURI;
+    // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
+    // instead.
+    pathRDD.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String segmentTarPath)
+          throws Exception {
+        PluginManager.get().init();
+        for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+          PinotFSFactory
+              .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
         }
-      });
-    }
+        try {
+          SegmentPushUtils.pushSegments(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
+              Arrays.asList(segmentTarPath));
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
index 7b442477c4..a453e99fc9 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
@@ -18,20 +18,12 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.spark3;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import 
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentUriPushJobRunner;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -43,94 +35,37 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.VoidFunction;
 
 
-public class SparkSegmentUriPushJobRunner implements IngestionJobRunner, 
Serializable {
-  private SegmentGenerationJobSpec _spec;
+public class SparkSegmentUriPushJobRunner extends 
BaseSparkSegmentUriPushJobRunner {
 
   public SparkSegmentUriPushJobRunner() {
+    super();
   }
 
   public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
-    init(spec);
+    super(spec);
   }
 
   @Override
-  public void init(SegmentGenerationJobSpec spec) {
-    _spec = spec;
-    if (_spec.getPushJobSpec() == null) {
-      throw new RuntimeException("Missing PushJobSpec");
-    }
-  }
-
-  @Override
-  public void run() {
-    //init all file systems
-    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
-    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-    }
-
-    //Get outputFS for writing output Pinot segments
-    URI outputDirURI;
-    try {
-      outputDirURI = new URI(_spec.getOutputDirURI());
-      if (outputDirURI.getScheme() == null) {
-        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
-      }
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
-    }
-    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-
-    //Get list of files to process
-    String[] files;
-    try {
-      files = outputDirFS.listFiles(outputDirURI, true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
-    }
-    List<String> segmentUris = new ArrayList<>();
-    for (String file : files) {
-      URI uri = URI.create(file);
-      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        URI updatedURI = SegmentPushUtils
-            .generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
-                _spec.getPushJobSpec().getSegmentUriSuffix());
-        segmentUris.add(updatedURI.toString());
-      }
-    }
-
-    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
-    if (pushParallelism < 1) {
-      pushParallelism = segmentUris.size();
-    }
-    if (pushParallelism == 1) {
-      // Push from driver
-      try {
-        SegmentPushUtils.sendSegmentUris(_spec, segmentUris);
-      } catch (RetriableOperationException | AttemptsExceededException e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, 
pushParallelism);
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentUri)
-            throws Exception {
-          try {
-            PluginManager.get().init();
-            for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-              PinotFSFactory
-                  .register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-            }
-            SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
+  public void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs, 
List<String> segmentUris, int pushParallelism) {
+    JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, 
pushParallelism);
+    // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
+    // instead.
+    pathRDD.foreach(new VoidFunction<String>() {
+      @Override
+      public void call(String segmentUri)
+          throws Exception {
+        try {
+          PluginManager.get().init();
+          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+            PinotFSFactory
+                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
           }
+          SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
+        } catch (RetriableOperationException | AttemptsExceededException e) {
+          throw new RuntimeException(e);
         }
-      });
-    }
+      }
+    });
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/pom.xml
similarity index 68%
copy from pinot-plugins/pinot-batch-ingestion/pom.xml
copy to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/pom.xml
index 377e0eca25..ec91276a57 100644
--- a/pinot-plugins/pinot-batch-ingestion/pom.xml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
 
     Licensed to the Apache Software Foundation (ASF) under one
@@ -22,33 +22,32 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>pinot-plugins</artifactId>
+    <artifactId>pinot-batch-ingestion</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>1.3.0-SNAPSHOT</version>
   </parent>
-  <artifactId>pinot-batch-ingestion</artifactId>
-  <packaging>pom</packaging>
-  <name>Pinot Batch Ingestion</name>
+
+  <artifactId>pinot-batch-ingestion-spark-base</artifactId>
+  <name>Pinot Batch Ingestion Spark Base</name>
   <url>https://pinot.apache.org/</url>
   <properties>
-    <pinot.root>${basedir}/../..</pinot.root>
-    <plugin.type>pinot-batch-ingestion</plugin.type>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <shade.phase.prop>package</shade.phase.prop>
   </properties>
 
-  <modules>
-    <module>pinot-batch-ingestion-common</module>
-    <module>pinot-batch-ingestion-spark-2.4</module>
-    <module>pinot-batch-ingestion-spark-3</module>
-
-    <module>pinot-batch-ingestion-hadoop</module>
-    <module>pinot-batch-ingestion-standalone</module>
-  </modules>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-core</artifactId>
-      <scope>provided</scope>
+      <artifactId>pinot-batch-ingestion-common</artifactId>
     </dependency>
   </dependencies>
+
+  <profiles>
+    <profile>
+      <id>pinot-fastdev</id>
+      <properties>
+        <shade.phase.prop>none</shade.phase.prop>
+      </properties>
+    </profile>
+  </profiles>
 </project>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentMetadataPushJobRunner.java
similarity index 67%
copy from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
copy to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentMetadataPushJobRunner.java
index eadd389f7f..9f1ae01714 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentMetadataPushJobRunner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.ingestion.batch.spark;
+package org.apache.pinot.plugin.ingestion.batch.spark.common;
 
 import java.io.File;
 import java.io.IOException;
@@ -24,7 +24,6 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -34,22 +33,17 @@ import 
org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
 
 
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner, 
Serializable {
-  private SegmentGenerationJobSpec _spec;
+public abstract class BaseSparkSegmentMetadataPushJobRunner implements 
IngestionJobRunner, Serializable {
+  protected SegmentGenerationJobSpec _spec;
 
-  public SparkSegmentTarPushJobRunner() {
+  public BaseSparkSegmentMetadataPushJobRunner() {
   }
 
-  public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
+  public BaseSparkSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
     init(spec);
   }
 
@@ -104,28 +98,18 @@ public class SparkSegmentTarPushJobRunner implements 
IngestionJobRunner, Seriali
         throw new RuntimeException(e);
       }
     } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
-      URI finalOutputDirURI = outputDirURI;
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentTarPath)
-            throws Exception {
-          PluginManager.get().init();
-          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-            PinotFSFactory
-                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
-          }
-          try {
-            SegmentPushUtils.pushSegments(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
-                Arrays.asList(segmentTarPath));
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
+      parallelizeMetadataPushJob(segmentsToPush, pinotFSSpecs, 
pushParallelism, outputDirURI);
     }
   }
+
+  /**
+   * Parallelizes the metadata push job using Spark to distribute the work 
across multiple nodes.
+   *
+   * @param segmentsToPush the list of segment URIs to be pushed
+   * @param pinotFSSpecs the list of Pinot file system specifications to be 
registered
+   * @param pushParallelism the level of parallelism for the push job
+   * @param outputDirURI the URI of the output directory containing the 
segments
+   */
+  public abstract void parallelizeMetadataPushJob(List<String> segmentsToPush, 
List<PinotFSSpec> pinotFSSpecs,
+      int pushParallelism, URI outputDirURI);
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentTarPushJobRunner.java
similarity index 67%
copy from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
copy to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentTarPushJobRunner.java
index eadd389f7f..b6965368c0 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentTarPushJobRunner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.ingestion.batch.spark;
+package org.apache.pinot.plugin.ingestion.batch.spark.common;
 
 import java.io.File;
 import java.io.IOException;
@@ -24,7 +24,6 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -34,22 +33,17 @@ import 
org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
 
 
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner, 
Serializable {
-  private SegmentGenerationJobSpec _spec;
+public abstract class BaseSparkSegmentTarPushJobRunner implements 
IngestionJobRunner, Serializable {
+  protected SegmentGenerationJobSpec _spec;
 
-  public SparkSegmentTarPushJobRunner() {
+  public BaseSparkSegmentTarPushJobRunner() {
   }
 
-  public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
+  public BaseSparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
     init(spec);
   }
 
@@ -104,28 +98,17 @@ public class SparkSegmentTarPushJobRunner implements 
IngestionJobRunner, Seriali
         throw new RuntimeException(e);
       }
     } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush, 
pushParallelism);
-      URI finalOutputDirURI = outputDirURI;
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentTarPath)
-            throws Exception {
-          PluginManager.get().init();
-          for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-            PinotFSFactory
-                .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), 
new PinotConfiguration(pinotFSSpec));
-          }
-          try {
-            SegmentPushUtils.pushSegments(_spec, 
PinotFSFactory.create(finalOutputDirURI.getScheme()),
-                Arrays.asList(segmentTarPath));
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
+      parallelizeTarPushJob(pinotFSSpecs, segmentsToPush, pushParallelism, 
outputDirURI);
     }
   }
+
+  /**
+   * Parallelizes the tar push job using Spark to distribute the work across 
multiple nodes.
+   *
+   * @param pinotFSSpecs the list of Pinot file system specifications to be 
registered
+   * @param segmentUris the list of segment URIs to be pushed
+   * @param pushParallelism the level of parallelism for the push job
+   */
+  public abstract void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
+      List<String> segmentUris, int pushParallelism, URI outputDirURI);
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentUriPushJobRunner.java
similarity index 68%
copy from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
copy to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentUriPushJobRunner.java
index e5080c4748..4188f5e4f4 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentUriPushJobRunner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.ingestion.batch.spark;
+package org.apache.pinot.plugin.ingestion.batch.spark.common;
 
 import java.io.File;
 import java.io.IOException;
@@ -24,7 +24,6 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -34,22 +33,18 @@ import 
org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
 
 
-public class SparkSegmentUriPushJobRunner implements IngestionJobRunner, 
Serializable {
-  private SegmentGenerationJobSpec _spec;
+public abstract class BaseSparkSegmentUriPushJobRunner implements 
IngestionJobRunner, Serializable {
 
-  public SparkSegmentUriPushJobRunner() {
+  protected SegmentGenerationJobSpec _spec;
+
+  public BaseSparkSegmentUriPushJobRunner() {
   }
 
-  public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
+  public BaseSparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
     init(spec);
   }
 
@@ -92,8 +87,8 @@ public class SparkSegmentUriPushJobRunner implements 
IngestionJobRunner, Seriali
     for (String file : files) {
       URI uri = URI.create(file);
       if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        URI updatedURI = SegmentPushUtils
-            .generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
+        URI updatedURI =
+            SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
                 _spec.getPushJobSpec().getSegmentUriSuffix());
         segmentUris.add(updatedURI.toString());
       }
@@ -111,26 +106,17 @@ public class SparkSegmentUriPushJobRunner implements 
IngestionJobRunner, Seriali
         throw new RuntimeException(e);
       }
     } else {
-      JavaSparkContext sparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
-      JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris, 
pushParallelism);
-      // Prevent using lambda expression in Spark to avoid potential 
serialization exceptions, use inner function
-      // instead.
-      pathRDD.foreach(new VoidFunction<String>() {
-        @Override
-        public void call(String segmentUri)
-            throws Exception {
-          try {
-            PluginManager.get().init();
-            for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
-              PinotFSFactory
-                  .register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
-            }
-            SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
-          } catch (RetriableOperationException | AttemptsExceededException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
+      parallelizeUriPushJob(pinotFSSpecs, segmentUris, pushParallelism);
     }
   }
+
+  /**
+   * Parallelizes the uri push job using Spark to distribute the work across 
multiple nodes.
+   *
+   * @param pinotFSSpecs the list of Pinot file system specifications to be 
registered
+   * @param segmentUris the list of segment URIs to be pushed
+   * @param pushParallelism the level of parallelism for the push job
+   */
+  public abstract void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs,
+      List<String> segmentUris, int pushParallelism);
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pom.xml
index 377e0eca25..564c76aaeb 100644
--- a/pinot-plugins/pinot-batch-ingestion/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/pom.xml
@@ -37,6 +37,7 @@
 
   <modules>
     <module>pinot-batch-ingestion-common</module>
+    <module>pinot-batch-ingestion-spark-base</module>
     <module>pinot-batch-ingestion-spark-2.4</module>
     <module>pinot-batch-ingestion-spark-3</module>
 
diff --git a/pom.xml b/pom.xml
index 053e5ae414..09a5adff74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -519,6 +519,11 @@
         <artifactId>pinot-batch-ingestion-hadoop</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-batch-ingestion-spark-base</artifactId>
+        <version>${project.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.pinot</groupId>
         <artifactId>pinot-batch-ingestion-spark-2.4</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to