This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fda93ab083 Spark batch ingestion common code abstraction. (#14415)
fda93ab083 is described below
commit fda93ab0831d495c4dd8ea02a54eadc3d7ed6a53
Author: Abhishek Sharma <[email protected]>
AuthorDate: Sat Dec 14 19:11:19 2024 -0500
Spark batch ingestion common code abstraction. (#14415)
---
.../pinot-batch-ingestion-spark-2.4/pom.xml | 2 +-
.../spark/SparkSegmentMetadataPushJobRunner.java | 112 +++++----------------
.../batch/spark/SparkSegmentTarPushJobRunner.java | 111 ++++++--------------
.../batch/spark/SparkSegmentUriPushJobRunner.java | 111 +++++---------------
.../pinot-batch-ingestion-spark-3/pom.xml | 2 +-
.../spark3/SparkSegmentMetadataPushJobRunner.java | 1 -
.../batch/spark3/SparkSegmentTarPushJobRunner.java | 111 +++++---------------
.../batch/spark3/SparkSegmentUriPushJobRunner.java | 111 +++++---------------
.../{ => pinot-batch-ingestion-spark-base}/pom.xml | 35 ++++---
.../BaseSparkSegmentMetadataPushJobRunner.java} | 50 ++++-----
.../common/BaseSparkSegmentTarPushJobRunner.java} | 49 +++------
.../common/BaseSparkSegmentUriPushJobRunner.java} | 52 ++++------
pinot-plugins/pinot-batch-ingestion/pom.xml | 1 +
pom.xml | 5 +
14 files changed, 205 insertions(+), 548 deletions(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml
index 9eb28d2f49..8b04760514 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml
@@ -38,7 +38,7 @@
<dependencies>
<dependency>
<groupId>org.apache.pinot</groupId>
- <artifactId>pinot-batch-ingestion-common</artifactId>
+ <artifactId>pinot-batch-ingestion-spark-base</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
index f4588cd1cc..fed7f7a150 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentMetadataPushJobRunner.java
@@ -18,20 +18,13 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentMetadataPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -42,9 +35,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
-
-public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner,
Serializable {
- private SegmentGenerationJobSpec _spec;
+public class SparkSegmentMetadataPushJobRunner extends
BaseSparkSegmentMetadataPushJobRunner {
public SparkSegmentMetadataPushJobRunner() {
}
@@ -54,80 +45,31 @@ public class SparkSegmentMetadataPushJobRunner implements
IngestionJobRunner, Se
}
@Override
- public void init(SegmentGenerationJobSpec spec) {
- _spec = spec;
- }
-
- @Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
-
- //Get outputFS for writing output pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
- PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
-
- List<String> segmentsToPush = new ArrayList<>();
- for (String file : files) {
- if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
- segmentsToPush.add(file);
- }
- }
-
- int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
- if (pushParallelism < 1) {
- pushParallelism = segmentsToPush.size();
- }
- if (pushParallelism == 1) {
- // Push from driver
- try {
- SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- } else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
- URI finalOutputDirURI = outputDirURI;
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentTarPath)
- throws Exception {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
- }
- try {
- Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
- .getSegmentUriToTarPathMap(finalOutputDirURI,
_spec.getPushJobSpec(), new String[]{segmentTarPath});
- SegmentPushUtils.sendSegmentUriAndMetadata(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
- segmentUriToTarPathMap);
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
+ public void parallelizeMetadataPushJob(List<String> segmentsToPush,
List<PinotFSSpec> pinotFSSpecs,
+ int pushParallelism, URI outputDirURI) {
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
+ URI finalOutputDirURI = outputDirURI;
+ // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
+ // instead.
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String segmentTarPath)
+ throws Exception {
+ PluginManager.get().init();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory
+ .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
+ }
+ try {
+ Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
+ .getSegmentUriToTarPathMap(finalOutputDirURI,
_spec.getPushJobSpec(), new String[]{segmentTarPath});
+ SegmentPushUtils.sendSegmentUriAndMetadata(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
+ segmentUriToTarPathMap);
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
}
- });
- }
+ }
+ });
}
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
index eadd389f7f..babe30e769 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
@@ -18,20 +18,13 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentTarPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -43,89 +36,41 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner,
Serializable {
+
+public class SparkSegmentTarPushJobRunner extends
BaseSparkSegmentTarPushJobRunner {
private SegmentGenerationJobSpec _spec;
public SparkSegmentTarPushJobRunner() {
+ super();
}
public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
- init(spec);
- }
-
- @Override
- public void init(SegmentGenerationJobSpec spec) {
- _spec = spec;
+ super(spec);
}
- @Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
-
- //Get outputFS for writing output pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
- PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
-
- List<String> segmentsToPush = new ArrayList<>();
- for (String file : files) {
- if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
- segmentsToPush.add(file);
- }
- }
-
- int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
- if (pushParallelism < 1) {
- pushParallelism = segmentsToPush.size();
- }
- if (pushParallelism == 1) {
- // Push from driver
- try {
- SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- } else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
- URI finalOutputDirURI = outputDirURI;
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentTarPath)
- throws Exception {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
- }
- try {
- SegmentPushUtils.pushSegments(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
- Arrays.asList(segmentTarPath));
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
+ public void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
+ List<String> segmentUris, int pushParallelism, URI outputDirURI) {
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris,
pushParallelism);
+ URI finalOutputDirURI = outputDirURI;
+ // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
+ // instead.
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String segmentTarPath)
+ throws Exception {
+ PluginManager.get().init();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory
+ .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
}
- });
- }
+ try {
+ SegmentPushUtils.pushSegments(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
+ Arrays.asList(segmentTarPath));
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
index e5080c4748..b809c3bcba 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
@@ -18,20 +18,12 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentUriPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -43,94 +35,37 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
-public class SparkSegmentUriPushJobRunner implements IngestionJobRunner,
Serializable {
- private SegmentGenerationJobSpec _spec;
+public class SparkSegmentUriPushJobRunner extends
BaseSparkSegmentUriPushJobRunner {
public SparkSegmentUriPushJobRunner() {
+ super();
}
public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
- init(spec);
+ super(spec);
}
@Override
- public void init(SegmentGenerationJobSpec spec) {
- _spec = spec;
- if (_spec.getPushJobSpec() == null) {
- throw new RuntimeException("Missing PushJobSpec");
- }
- }
-
- @Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
-
- //Get outputFS for writing output Pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
- PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
- List<String> segmentUris = new ArrayList<>();
- for (String file : files) {
- URI uri = URI.create(file);
- if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
- URI updatedURI = SegmentPushUtils
- .generateSegmentTarURI(outputDirURI, uri,
_spec.getPushJobSpec().getSegmentUriPrefix(),
- _spec.getPushJobSpec().getSegmentUriSuffix());
- segmentUris.add(updatedURI.toString());
- }
- }
-
- int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
- if (pushParallelism < 1) {
- pushParallelism = segmentUris.size();
- }
- if (pushParallelism == 1) {
- // Push from driver
- try {
- SegmentPushUtils.sendSegmentUris(_spec, segmentUris);
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- } else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris,
pushParallelism);
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentUri)
- throws Exception {
- try {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
- SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
+ public void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs,
List<String> segmentUris, int pushParallelism) {
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris,
pushParallelism);
+ // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
+ // instead.
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String segmentUri)
+ throws Exception {
+ try {
+ PluginManager.get().init();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory
+ .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
}
+ SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
}
- });
- }
+ }
+ });
}
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml
index 74c1dc278e..e43a1a5525 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml
@@ -38,7 +38,7 @@
<dependencies>
<dependency>
<groupId>org.apache.pinot</groupId>
- <artifactId>pinot-batch-ingestion-common</artifactId>
+ <artifactId>pinot-batch-ingestion-spark-base</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
index 4a4e4729e7..e2cb2df38a 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java
@@ -55,7 +55,6 @@ import org.apache.spark.scheduler.JobResult;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobEnd;
-
public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner,
Serializable {
// This listener is added to the SparkContext and is executed when the Spark
job fails.
// It handles the failure by calling
ConsistentDataPushUtils.handleUploadException.
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
index 9d06532858..a8c51fd606 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentTarPushJobRunner.java
@@ -18,20 +18,13 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark3;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentTarPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -42,90 +35,40 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
-
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner,
Serializable {
+public class SparkSegmentTarPushJobRunner extends
BaseSparkSegmentTarPushJobRunner {
private SegmentGenerationJobSpec _spec;
public SparkSegmentTarPushJobRunner() {
+ super();
}
public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
- init(spec);
- }
-
- @Override
- public void init(SegmentGenerationJobSpec spec) {
- _spec = spec;
+ super(spec);
}
- @Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
-
- //Get outputFS for writing output pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
- PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
-
- List<String> segmentsToPush = new ArrayList<>();
- for (String file : files) {
- if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
- segmentsToPush.add(file);
- }
- }
-
- int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
- if (pushParallelism < 1) {
- pushParallelism = segmentsToPush.size();
- }
- if (pushParallelism == 1) {
- // Push from driver
- try {
- SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- } else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
- URI finalOutputDirURI = outputDirURI;
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentTarPath)
- throws Exception {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
- }
- try {
- SegmentPushUtils.pushSegments(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
- Arrays.asList(segmentTarPath));
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
+ public void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
+ List<String> segmentUris, int pushParallelism, URI outputDirURI) {
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris,
pushParallelism);
+ URI finalOutputDirURI = outputDirURI;
+ // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
+ // instead.
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String segmentTarPath)
+ throws Exception {
+ PluginManager.get().init();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory
+ .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
}
- });
- }
+ try {
+ SegmentPushUtils.pushSegments(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
+ Arrays.asList(segmentTarPath));
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
index 7b442477c4..a453e99fc9 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentUriPushJobRunner.java
@@ -18,20 +18,12 @@
*/
package org.apache.pinot.plugin.ingestion.batch.spark3;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import
org.apache.pinot.plugin.ingestion.batch.spark.common.BaseSparkSegmentUriPushJobRunner;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
-import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -43,94 +35,37 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
-public class SparkSegmentUriPushJobRunner implements IngestionJobRunner,
Serializable {
- private SegmentGenerationJobSpec _spec;
+public class SparkSegmentUriPushJobRunner extends
BaseSparkSegmentUriPushJobRunner {
public SparkSegmentUriPushJobRunner() {
+ super();
}
public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
- init(spec);
+ super(spec);
}
@Override
- public void init(SegmentGenerationJobSpec spec) {
- _spec = spec;
- if (_spec.getPushJobSpec() == null) {
- throw new RuntimeException("Missing PushJobSpec");
- }
- }
-
- @Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
-
- //Get outputFS for writing output Pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
- PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
-
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
- List<String> segmentUris = new ArrayList<>();
- for (String file : files) {
- URI uri = URI.create(file);
- if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
- URI updatedURI = SegmentPushUtils
- .generateSegmentTarURI(outputDirURI, uri,
_spec.getPushJobSpec().getSegmentUriPrefix(),
- _spec.getPushJobSpec().getSegmentUriSuffix());
- segmentUris.add(updatedURI.toString());
- }
- }
-
- int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
- if (pushParallelism < 1) {
- pushParallelism = segmentUris.size();
- }
- if (pushParallelism == 1) {
- // Push from driver
- try {
- SegmentPushUtils.sendSegmentUris(_spec, segmentUris);
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- } else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris,
pushParallelism);
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentUri)
- throws Exception {
- try {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
- SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
+ public void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs,
List<String> segmentUris, int pushParallelism) {
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris,
pushParallelism);
+ // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
+ // instead.
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String segmentUri)
+ throws Exception {
+ try {
+ PluginManager.get().init();
+ for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+ PinotFSFactory
+ .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
}
+ SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
+ } catch (RetriableOperationException | AttemptsExceededException e) {
+ throw new RuntimeException(e);
}
- });
- }
+ }
+ });
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/pom.xml
similarity index 68%
copy from pinot-plugins/pinot-batch-ingestion/pom.xml
copy to
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/pom.xml
index 377e0eca25..ec91276a57 100644
--- a/pinot-plugins/pinot-batch-ingestion/pom.xml
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -22,33 +22,32 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>pinot-plugins</artifactId>
+ <artifactId>pinot-batch-ingestion</artifactId>
<groupId>org.apache.pinot</groupId>
<version>1.3.0-SNAPSHOT</version>
</parent>
- <artifactId>pinot-batch-ingestion</artifactId>
- <packaging>pom</packaging>
- <name>Pinot Batch Ingestion</name>
+
+ <artifactId>pinot-batch-ingestion-spark-base</artifactId>
+ <name>Pinot Batch Ingestion Spark Base</name>
<url>https://pinot.apache.org/</url>
<properties>
- <pinot.root>${basedir}/../..</pinot.root>
- <plugin.type>pinot-batch-ingestion</plugin.type>
+ <pinot.root>${basedir}/../../..</pinot.root>
+ <shade.phase.prop>package</shade.phase.prop>
</properties>
- <modules>
- <module>pinot-batch-ingestion-common</module>
- <module>pinot-batch-ingestion-spark-2.4</module>
- <module>pinot-batch-ingestion-spark-3</module>
-
- <module>pinot-batch-ingestion-hadoop</module>
- <module>pinot-batch-ingestion-standalone</module>
- </modules>
-
<dependencies>
<dependency>
<groupId>org.apache.pinot</groupId>
- <artifactId>pinot-core</artifactId>
- <scope>provided</scope>
+ <artifactId>pinot-batch-ingestion-common</artifactId>
</dependency>
</dependencies>
+
+ <profiles>
+ <profile>
+ <id>pinot-fastdev</id>
+ <properties>
+ <shade.phase.prop>none</shade.phase.prop>
+ </properties>
+ </profile>
+ </profiles>
</project>
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentMetadataPushJobRunner.java
similarity index 67%
copy from
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
copy to
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentMetadataPushJobRunner.java
index eadd389f7f..9f1ae01714 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentMetadataPushJobRunner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.plugin.ingestion.batch.spark;
+package org.apache.pinot.plugin.ingestion.batch.spark.common;
import java.io.File;
import java.io.IOException;
@@ -24,7 +24,6 @@ import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -34,22 +33,17 @@ import
org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner,
Serializable {
- private SegmentGenerationJobSpec _spec;
+public abstract class BaseSparkSegmentMetadataPushJobRunner implements
IngestionJobRunner, Serializable {
+ protected SegmentGenerationJobSpec _spec;
- public SparkSegmentTarPushJobRunner() {
+ public BaseSparkSegmentMetadataPushJobRunner() {
}
- public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
+ public BaseSparkSegmentMetadataPushJobRunner(SegmentGenerationJobSpec spec) {
init(spec);
}
@@ -104,28 +98,18 @@ public class SparkSegmentTarPushJobRunner implements
IngestionJobRunner, Seriali
throw new RuntimeException(e);
}
} else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
- URI finalOutputDirURI = outputDirURI;
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentTarPath)
- throws Exception {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
- }
- try {
- SegmentPushUtils.pushSegments(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
- Arrays.asList(segmentTarPath));
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- }
- });
+ parallelizeMetadataPushJob(segmentsToPush, pinotFSSpecs,
pushParallelism, outputDirURI);
}
}
+
+ /**
+ * Parallelizes the metadata push job using Spark to distribute the work
across multiple nodes.
+ *
+ * @param segmentsToPush the list of segment URIs to be pushed
+ * @param pinotFSSpecs the list of Pinot file system specifications to be
registered
+ * @param pushParallelism the level of parallelism for the push job
+ * @param outputDirURI the URI of the output directory containing the
segments
+ */
+ public abstract void parallelizeMetadataPushJob(List<String> segmentsToPush,
List<PinotFSSpec> pinotFSSpecs,
+ int pushParallelism, URI outputDirURI);
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentTarPushJobRunner.java
similarity index 67%
copy from
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
copy to
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentTarPushJobRunner.java
index eadd389f7f..b6965368c0 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentTarPushJobRunner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.plugin.ingestion.batch.spark;
+package org.apache.pinot.plugin.ingestion.batch.spark.common;
import java.io.File;
import java.io.IOException;
@@ -24,7 +24,6 @@ import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -34,22 +33,17 @@ import
org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
-public class SparkSegmentTarPushJobRunner implements IngestionJobRunner,
Serializable {
- private SegmentGenerationJobSpec _spec;
+public abstract class BaseSparkSegmentTarPushJobRunner implements
IngestionJobRunner, Serializable {
+ protected SegmentGenerationJobSpec _spec;
- public SparkSegmentTarPushJobRunner() {
+ public BaseSparkSegmentTarPushJobRunner() {
}
- public SparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
+ public BaseSparkSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
init(spec);
}
@@ -104,28 +98,17 @@ public class SparkSegmentTarPushJobRunner implements
IngestionJobRunner, Seriali
throw new RuntimeException(e);
}
} else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
- URI finalOutputDirURI = outputDirURI;
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentTarPath)
- throws Exception {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
- }
- try {
- SegmentPushUtils.pushSegments(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
- Arrays.asList(segmentTarPath));
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- }
- });
+ parallelizeTarPushJob(pinotFSSpecs, segmentsToPush, pushParallelism,
outputDirURI);
}
}
+
+ /**
+ * Parallelizes the tar push job using Spark to distribute the work across
multiple nodes.
+ *
+ * @param pinotFSSpecs the list of Pinot file system specifications to be
registered
+ * @param segmentUris the list of segment URIs to be pushed
+ * @param pushParallelism the level of parallelism for the push job
+ */
+ public abstract void parallelizeTarPushJob(List<PinotFSSpec> pinotFSSpecs,
+ List<String> segmentUris, int pushParallelism, URI outputDirURI);
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentUriPushJobRunner.java
similarity index 68%
copy from
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
copy to
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentUriPushJobRunner.java
index e5080c4748..4188f5e4f4 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-base/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/common/BaseSparkSegmentUriPushJobRunner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.plugin.ingestion.batch.spark;
+package org.apache.pinot.plugin.ingestion.batch.spark.common;
import java.io.File;
import java.io.IOException;
@@ -24,7 +24,6 @@ import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -34,22 +33,18 @@ import
org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.VoidFunction;
-public class SparkSegmentUriPushJobRunner implements IngestionJobRunner,
Serializable {
- private SegmentGenerationJobSpec _spec;
+public abstract class BaseSparkSegmentUriPushJobRunner implements
IngestionJobRunner, Serializable {
- public SparkSegmentUriPushJobRunner() {
+ protected SegmentGenerationJobSpec _spec;
+
+ public BaseSparkSegmentUriPushJobRunner() {
}
- public SparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
+ public BaseSparkSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
init(spec);
}
@@ -92,8 +87,8 @@ public class SparkSegmentUriPushJobRunner implements
IngestionJobRunner, Seriali
for (String file : files) {
URI uri = URI.create(file);
if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
- URI updatedURI = SegmentPushUtils
- .generateSegmentTarURI(outputDirURI, uri,
_spec.getPushJobSpec().getSegmentUriPrefix(),
+ URI updatedURI =
+ SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri,
_spec.getPushJobSpec().getSegmentUriPrefix(),
_spec.getPushJobSpec().getSegmentUriSuffix());
segmentUris.add(updatedURI.toString());
}
@@ -111,26 +106,17 @@ public class SparkSegmentUriPushJobRunner implements
IngestionJobRunner, Seriali
throw new RuntimeException(e);
}
} else {
- JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
- JavaRDD<String> pathRDD = sparkContext.parallelize(segmentUris,
pushParallelism);
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentUri)
- throws Exception {
- try {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
- SegmentPushUtils.sendSegmentUris(_spec, Arrays.asList(segmentUri));
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
- }
- }
- });
+ parallelizeUriPushJob(pinotFSSpecs, segmentUris, pushParallelism);
}
}
+
+ /**
+ * Parallelizes the uri push job using Spark to distribute the work across
multiple nodes.
+ *
+ * @param pinotFSSpecs the list of Pinot file system specifications to be
registered
+ * @param segmentUris the list of segment URIs to be pushed
+ * @param pushParallelism the level of parallelism for the push job
+ */
+ public abstract void parallelizeUriPushJob(List<PinotFSSpec> pinotFSSpecs,
+ List<String> segmentUris, int pushParallelism);
}
diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml
b/pinot-plugins/pinot-batch-ingestion/pom.xml
index 377e0eca25..564c76aaeb 100644
--- a/pinot-plugins/pinot-batch-ingestion/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/pom.xml
@@ -37,6 +37,7 @@
<modules>
<module>pinot-batch-ingestion-common</module>
+ <module>pinot-batch-ingestion-spark-base</module>
<module>pinot-batch-ingestion-spark-2.4</module>
<module>pinot-batch-ingestion-spark-3</module>
diff --git a/pom.xml b/pom.xml
index 053e5ae414..09a5adff74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -519,6 +519,11 @@
<artifactId>pinot-batch-ingestion-hadoop</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-batch-ingestion-spark-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-batch-ingestion-spark-2.4</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]