This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 63bf8dcc8cf Centralize batch ingestion job spec constants into
SegmentGenerationJobUtils (#18413)
63bf8dcc8cf is described below
commit 63bf8dcc8cfe71cc318673e398b98d938c400833
Author: Akanksha kedia <[email protected]>
AuthorDate: Tue Jun 23 13:07:59 2026 +0530
Centralize batch ingestion job spec constants into
SegmentGenerationJobUtils (#18413)
Move duplicated string constants (SEGMENT_GENERATION_JOB_SPEC,
DEPENDENCY_JAR_DIR, STAGING_DIR) from HadoopSegmentGenerationJobRunner
and SparkSegmentGenerationJobRunner into the shared
SegmentGenerationJobUtils class in pinot-batch-ingestion-common.
This eliminates silent drift between the Hadoop and Spark runners,
where the same config keys were independently declared as private
literals. The Hadoop runner's public SEGMENT_GENERATION_JOB_SPEC
field is retained as a delegating alias for backward compatibility.
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
.../ingestion/batch/common/SegmentGenerationJobUtils.java | 7 +++++++
.../batch/hadoop/HadoopSegmentGenerationJobRunner.java | 15 +++++++--------
.../batch/spark3/SparkSegmentGenerationJobRunner.java | 10 +++++-----
3 files changed, 19 insertions(+), 13 deletions(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
index 816bef6232e..1129bcce128 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
@@ -47,6 +47,13 @@ public class SegmentGenerationJobUtils implements
Serializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentGenerationJobUtils.class);
+ // Key used to pass the serialized SegmentGenerationJobSpec through a
distributed job framework
+ public static final String SEGMENT_GENERATION_JOB_SPEC =
"segmentGenerationJobSpec";
+
+ // Field names in the executionFrameworkSpec/extraConfigs section shared
across ingestion frameworks
+ public static final String DEPENDENCY_JAR_DIR = "dependencyJarDir";
+ public static final String STAGING_DIR = "stagingDir";
+
/**
* Always use local directory sequence id unless explicitly config:
"use.global.directory.sequence.id".
*
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 918ecb99da6..de173ff93d9 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -67,13 +67,10 @@ import static
org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY
public class HadoopSegmentGenerationJobRunner extends Configured implements
IngestionJobRunner, Serializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(HadoopSegmentGenerationJobRunner.class);
- public static final String SEGMENT_GENERATION_JOB_SPEC =
"segmentGenerationJobSpec";
+ // Kept for backward compatibility; callers should prefer
SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC
+ public static final String SEGMENT_GENERATION_JOB_SPEC =
SegmentGenerationJobUtils.SEGMENT_GENERATION_JOB_SPEC;
- // Field names in job spec's executionFrameworkSpec/extraConfigs section
- private static final String DEPS_JAR_DIR_FIELD = "dependencyJarDir";
- private static final String STAGING_DIR_FIELD = "stagingDir";
-
- // Sub-dirs under directory specified by STAGING_DIR_FIELD
+ // Sub-dirs under the staging directory
private static final String SEGMENT_TAR_SUBDIR_NAME = "segmentTar";
private static final String DEPS_JAR_SUBDIR_NAME = "dependencyJars";
@@ -156,7 +153,8 @@ public class HadoopSegmentGenerationJobRunner extends
Configured implements Inge
outputDirFS.mkdir(outputDirURI);
//Get staging directory for temporary output pinot segments
- String stagingDir =
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR_FIELD);
+ String stagingDir =
+
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR);
Preconditions.checkNotNull(stagingDir, "Please set config: stagingDir
under 'executionFrameworkSpec.extraConfigs'");
URI stagingDirURI = URI.create(stagingDir);
if (stagingDirURI.getScheme() == null) {
@@ -247,7 +245,8 @@ public class HadoopSegmentGenerationJobRunner extends
Configured implements Inge
packPluginsToDistributedCache(job, outputDirFS, stagingDirURI);
// Add dependency jars, if we're provided with a directory containing
these.
- String dependencyJarsSrcDir =
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR_FIELD);
+ String dependencyJarsSrcDir =
+
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR);
if (dependencyJarsSrcDir != null) {
Path dependencyJarsDestPath = new Path(stagingDirURI.toString(),
DEPS_JAR_SUBDIR_NAME);
addJarsToDistributedCache(job, new File(dependencyJarsSrcDir),
outputDirFS, dependencyJarsDestPath.toUri(),
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
index d32fb861fea..fccf20175ca 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
@@ -67,8 +67,6 @@ import static
org.apache.pinot.spi.plugin.PluginManager.PLUGINS_INCLUDE_PROPERTY
public class SparkSegmentGenerationJobRunner implements IngestionJobRunner,
Serializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SparkSegmentGenerationJobRunner.class);
- private static final String DEPS_JAR_DIR = "dependencyJarDir";
- private static final String STAGING_DIR = "stagingDir";
private SegmentGenerationJobSpec _spec;
@@ -155,7 +153,8 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
outputDirFS.mkdir(outputDirURI);
//Get staging directory for temporary output pinot segments
- String stagingDir =
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
+ String stagingDir =
+
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.STAGING_DIR);
URI stagingDirURI = null;
if (stagingDir != null) {
stagingDirURI = URI.create(stagingDir);
@@ -178,9 +177,10 @@ public class SparkSegmentGenerationJobRunner implements
IngestionJobRunner, Seri
packPluginsToDistributedCache(sparkContext);
// Add dependency jars
- if
(_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR))
{
+ if (_spec.getExecutionFrameworkSpec().getExtraConfigs()
+ .containsKey(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR)) {
addDepsJarToDistributedCache(sparkContext,
-
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
+
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(SegmentGenerationJobUtils.DEPENDENCY_JAR_DIR));
}
List<String> pathAndIdxList = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]