This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch ingestion_into_spi
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 843eb686a215971bfbc8470331e5170b0e5be5da
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jan 8 17:05:59 2020 -0800

    Refactor pinot-batch-ingestion job specs to pinot-spi
---
 docs/batch_data_ingestion.rst                      | 15 ++++
 pinot-distribution/pinot-assembly.xml              |  1 -
 .../pom.xml                                        | 17 ++---
 .../standalone/SegmentGenerationJobRunner.java     | 87 ++++++++++++----------
 .../standalone}/SegmentGenerationTaskRunner.java   | 17 +++--
 .../standalone/SegmentTarPushJobRunner.java        | 26 +++++--
 .../standalone/SegmentUriPushJobRunner.java        | 26 +++++--
 .../segmentCreationAndTarPushJobSpec.yaml          |  5 ++
 .../segmentCreationAndUriPushJobSpec.yaml          |  5 ++
 .../src/main/resources/segmentCreationJobSpec.yaml |  5 ++
 .../src/main/resources/segmentTarPushJobSpec.yaml  |  5 ++
 .../src/main/resources/segmentUriPushJobSpec.yaml  |  5 ++
 pinot-plugins/pinot-batch-ingestion/pom.xml        |  2 +-
 pinot-spi/pom.xml                                  |  4 +
 .../pinot/spi/ingestion/IngestionJobLauncher.java  | 39 +++++++---
 .../spi/ingestion/runner/IngestionJobRunner.java   | 16 ++--
 .../pinot/spi/ingestion/spec}/Constants.java       |  2 +-
 .../spi/ingestion/spec/ExecutionFrameworkSpec.java | 76 +++++++++++++++++++
 .../spi/ingestion/spec}/PinotClusterSpec.java      |  2 +-
 .../pinot/spi/ingestion/spec}/PinotFSSpec.java     |  2 +-
 .../pinot/spi/ingestion/spec}/PushJobSpec.java     |  2 +-
 .../spi/ingestion/spec}/RecordReaderSpec.java      |  2 +-
 .../ingestion/spec}/SegmentGenerationJobSpec.java  | 22 +++++-
 .../ingestion/spec}/SegmentGenerationTaskSpec.java | 10 +--
 .../ingestion/spec}/SegmentNameGeneratorSpec.java  |  2 +-
 .../pinot/spi/ingestion/spec}/TableSpec.java       |  2 +-
 pinot-tools/pom.xml                                |  4 +-
 .../command/LaunchDataIngestionJobCommand.java     |  5 +-
 .../batch/airlineStats/ingestionJobSpec.yaml       | 15 ++++
 29 files changed, 308 insertions(+), 113 deletions(-)

diff --git a/docs/batch_data_ingestion.rst b/docs/batch_data_ingestion.rst
index 033c4b2..2e995a7 100644
--- a/docs/batch_data_ingestion.rst
+++ b/docs/batch_data_ingestion.rst
@@ -46,6 +46,21 @@ Below is an example (also located at 
`examples/batch/airlineStats/ingestionJobSp
 
 .. code-block:: none
 
+# executionFrameworkSpec: Defines ingestion jobs to be running.
+executionFrameworkSpec:
+
+  # name: execution framework name
+  name: 'standalone'
+
+  # segmentGenerationJobRunnerClassName: class name implements 
org.apache.pinot.spi.ingestion.runner.SegmentGenerationJobRunner interface.
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner'
+
+  # segmentTarPushJobRunnerClassName: class name implements 
org.apache.pinot.spi.ingestion.runner.SegmentTarPushJobRunner interface.
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner'
+
+  # segmentUriPushJobRunnerClassName: class name implements 
org.apache.pinot.spi.ingestion.runner.SegmentUriPushJobRunner interface.
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner'
+
 # jobType: Pinot ingestion job type.
 # Supported job types are:
 #   'SegmentCreation'
diff --git a/pinot-distribution/pinot-assembly.xml 
b/pinot-distribution/pinot-assembly.xml
index a410657..a95e93b 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -119,7 +119,6 @@
         <exclude>**/pinot-stream-ingestion/pinot-stream-ingestion/**</exclude>
         <exclude>**/pinot-stream-ingestion/pinot-kafka-base/**</exclude>
         <exclude>**/pinot-batch-ingestion/pinot-batch-ingestion/**</exclude>
-        
<exclude>**/pinot-batch-ingestion/pinot-batch-ingestion-base/**</exclude>
         <exclude>**/pinot-batch-ingestion/pinot-ingestion-common/**</exclude>
         <exclude>**/pinot-batch-ingestion/v0_deprecated/**</exclude>
       </excludes>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/pom.xml
similarity index 91%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/pom.xml
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/pom.xml
index 80c06f8..449571a 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/pom.xml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/pom.xml
@@ -29,8 +29,8 @@
     <relativePath>..</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
-  <artifactId>pinot-batch-ingestion-base</artifactId>
-  <name>Pinot Batch Ingestion Base</name>
+  <artifactId>pinot-batch-ingestion-standalone</artifactId>
+  <name>Pinot Batch Ingestion Standalone</name>
   <url>https://pinot.apache.org/</url>
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
@@ -38,25 +38,18 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-common</artifactId>
+      <artifactId>pinot-spi</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-core</artifactId>
+      <artifactId>pinot-common</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-parquet</artifactId>
+      <artifactId>pinot-core</artifactId>
       <version>${project.version}</version>
-      <scope>shaded</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.pinot</groupId>
-          <artifactId>pinot-core</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationJobRunner.java
similarity index 94%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationJobRunner.java
index 1df2e12..fcfd963 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationJobRunner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.standalone;
+package org.apache.pinot.plugin.ingestion.standalone;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,31 +36,66 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.MapConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.pinot.ingestion.common.Constants;
-import org.apache.pinot.ingestion.common.SegmentGenerationTaskRunner;
-import org.apache.pinot.ingestion.common.SegmentGenerationTaskSpec;
-import org.apache.pinot.ingestion.common.PinotClusterSpec;
-import org.apache.pinot.ingestion.common.PinotFSSpec;
-import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.spec.Constants;
+import org.apache.pinot.spi.ingestion.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.spec.SegmentGenerationTaskSpec;
 import org.apache.pinot.spi.utils.DataSize;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentGenerationJobRunner {
+public class SegmentGenerationJobRunner implements IngestionJobRunner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
   private static final String OFFLINE = "OFFLINE";
-  private static final String LOCAL_PINOT_FS_SCHEME = "file";
 
   private SegmentGenerationJobSpec _spec;
 
+  public SegmentGenerationJobRunner() {
+  }
+
   public SegmentGenerationJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  private static String generateSchemaURI(String controllerUri, String table) {
+    return String.format("%s/tables/%s/schema", controllerUri, table);
+  }
+
+  private static String generateTableConfigURI(String controllerUri, String 
table) {
+    return String.format("%s/tables/%s", controllerUri, table);
+  }
+
+  /**
+   * Generate a relative output directory path when `useRelativePath` flag is 
on.
+   * This method will compute the relative path based on `inputFile` and 
`baseInputDir`,
+   * then apply only the directory part of relative path to `outputDir`.
+   * E.g.
+   *    baseInputDir = "/path/to/input"
+   *    inputFile = "/path/to/input/a/b/c/d.avro"
+   *    outputDir = "/path/to/output"
+   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = 
/path/to/output/a/b/c
+   */
+  public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI 
outputDir) {
+    URI relativePath = baseInputDir.relativize(inputFile);
+    Preconditions.checkState(relativePath.getPath().length() > 0 && 
!relativePath.equals(inputFile),
+        "Unable to extract out the relative path based on base input path: " + 
baseInputDir);
+    String outputDirStr = outputDir.toString();
+    outputDir = !outputDirStr.endsWith("/") ? 
URI.create(outputDirStr.concat("/")) : outputDir;
+    URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
+    return relativeOutputURI;
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
     _spec = spec;
     if (_spec.getInputDirURI() == null) {
       throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' 
file");
@@ -96,34 +131,7 @@ public class SegmentGenerationJobRunner {
     }
   }
 
-  private static String generateSchemaURI(String controllerUri, String table) {
-    return String.format("%s/tables/%s/schema", controllerUri, table);
-  }
-
-  private static String generateTableConfigURI(String controllerUri, String 
table) {
-    return String.format("%s/tables/%s", controllerUri, table);
-  }
-
-  /**
-   * Generate a relative output directory path when `useRelativePath` flag is 
on.
-   * This method will compute the relative path based on `inputFile` and 
`baseInputDir`,
-   * then apply only the directory part of relative path to `outputDir`.
-   * E.g.
-   *    baseInputDir = "/path/to/input"
-   *    inputFile = "/path/to/input/a/b/c/d.avro"
-   *    outputDir = "/path/to/output"
-   *    getRelativeOutputPath(baseInputDir, inputFile, outputDir) = 
/path/to/output/a/b/c
-   */
-  public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI 
outputDir) {
-    URI relativePath = baseInputDir.relativize(inputFile);
-    Preconditions.checkState(relativePath.getPath().length() > 0 && 
!relativePath.equals(inputFile),
-        "Unable to extract out the relative path based on base input path: " + 
baseInputDir);
-    String outputDirStr = outputDir.toString();
-    outputDir = !outputDirStr.endsWith("/") ? 
URI.create(outputDirStr.concat("/")) : outputDir;
-    URI relativeOutputURI = outputDir.resolve(relativePath).resolve(".");
-    return relativeOutputURI;
-  }
-
+  @Override
   public void run()
       throws Exception {
     //init all file systems
@@ -194,7 +202,8 @@ public class SegmentGenerationJobRunner {
       for (int i = 0; i < filteredFiles.size(); i++) {
         URI inputFileURI = URI.create(filteredFiles.get(i));
         if (inputFileURI.getScheme() == null) {
-          inputFileURI = new URI(inputDirURI.getScheme(), 
inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
+          inputFileURI =
+              new URI(inputDirURI.getScheme(), 
inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
         }
         //copy input path to local
         File localInputDataFile = new File(localInputTempDir, new 
File(inputFileURI).getName());
@@ -206,7 +215,7 @@ public class SegmentGenerationJobRunner {
         taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
         taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
         taskSpec.setSchema(schema);
-        taskSpec.setTableConfig(tableConfig);
+        taskSpec.setTableConfig(tableConfig.toJsonNode());
         taskSpec.setSequenceId(i);
         
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationTaskRunner.java
similarity index 91%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationTaskRunner.java
index 49e2f4e..fc5338b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationTaskRunner.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.plugin.ingestion.standalone;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
@@ -31,6 +32,8 @@ import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl
 import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.ingestion.spec.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.ingestion.spec.SegmentNameGeneratorSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.readers.RecordReader;
@@ -59,8 +62,8 @@ public class SegmentGenerationTaskRunner {
 
   public String run()
       throws Exception {
-    String tableName = _taskSpec.getTableConfig().getTableName();
-    TableConfig tableConfig = _taskSpec.getTableConfig();
+    TableConfig tableConfig = 
TableConfig.fromJsonConfig(_taskSpec.getTableConfig());
+    String tableName = tableConfig.getTableName();
     Schema schema = _taskSpec.getSchema();
 
     //init record reader config
@@ -100,9 +103,11 @@ public class SegmentGenerationTaskRunner {
     return segmentIndexCreationDriver.getSegmentName();
   }
 
-  private SegmentNameGenerator getSegmentNameGerator() {
-    String tableName = _taskSpec.getTableConfig().getTableName();
-    TableConfig tableConfig = _taskSpec.getTableConfig();
+  private SegmentNameGenerator getSegmentNameGerator()
+      throws IOException {
+    TableConfig tableConfig = 
TableConfig.fromJsonConfig(_taskSpec.getTableConfig());
+    String tableName = tableConfig.getTableName();
+
     Schema schema = _taskSpec.getSchema();
     SegmentNameGeneratorSpec segmentNameGeneratorSpec = 
_taskSpec.getSegmentNameGeneratorSpec();
     if (segmentNameGeneratorSpec == null) {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentTarPushJobRunner.java
similarity index 91%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentTarPushJobRunner.java
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentTarPushJobRunner.java
index 11a9997..cc58ee2 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentTarPushJobRunner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.standalone;
+package org.apache.pinot.plugin.ingestion.standalone;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
@@ -29,32 +29,42 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.MapConfiguration;
-import org.apache.pinot.ingestion.common.Constants;
-import org.apache.pinot.ingestion.common.PinotClusterSpec;
-import org.apache.pinot.ingestion.common.PinotFSSpec;
-import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.spec.Constants;
+import org.apache.pinot.spi.ingestion.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentTarPushJobRunner {
+public class SegmentTarPushJobRunner implements IngestionJobRunner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentTarPushJobRunner.class);
 
   private SegmentGenerationJobSpec _spec;
 
+  public SegmentTarPushJobRunner() {
+  }
+
   public SegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
     _spec = spec;
   }
 
+  @Override
   public void run() {
     //init all file systems
     List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentUriPushJobRunner.java
similarity index 91%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentUriPushJobRunner.java
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentUriPushJobRunner.java
index 3bf7ed7..be54197 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentUriPushJobRunner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.standalone;
+package org.apache.pinot.plugin.ingestion.standalone;
 
 import java.io.File;
 import java.io.IOException;
@@ -27,35 +27,45 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.MapConfiguration;
-import org.apache.pinot.ingestion.common.Constants;
-import org.apache.pinot.ingestion.common.PinotClusterSpec;
-import org.apache.pinot.ingestion.common.PinotFSSpec;
-import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.spec.Constants;
+import org.apache.pinot.spi.ingestion.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
-import org.apache.pinot.spi.filesystem.PinotFSFactory;
-import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class SegmentUriPushJobRunner {
+public class SegmentUriPushJobRunner implements IngestionJobRunner {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentUriPushJobRunner.class);
 
   private SegmentGenerationJobSpec _spec;
 
+  public SegmentUriPushJobRunner() {
+  }
+
   public SegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
     _spec = spec;
     if (_spec.getPushJobSpec() == null) {
       throw new RuntimeException("Missing PushJobSpec");
     }
   }
 
+  @Override
   public void run() {
     //init all file systems
     List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
similarity index 78%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
index 18504dd..44d414a 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
@@ -17,6 +17,11 @@
 # under the License.
 #
 
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner'
 jobType: SegmentCreationAndTarPush
 inputDirURI: 'file:///path/to/input'
 includeFileNamePattern: 'glob:**/*.parquet'
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
similarity index 78%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
index 56b1599..dc7d8da 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
@@ -17,6 +17,11 @@
 # under the License.
 #
 
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner'
 jobType: SegmentCreationAndUriPush
 inputDirURI: 'file:///path/to/input'
 includeFileNamePattern: 'glob:**/*.parquet'
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationJobSpec.yaml
similarity index 79%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationJobSpec.yaml
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationJobSpec.yaml
index b4c0c74..1752913 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationJobSpec.yaml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationJobSpec.yaml
@@ -17,6 +17,11 @@
 # under the License.
 #
 
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner'
 jobType: SegmentCreation
 inputDirURI: 'file:///path/to/input'
 includeFileNamePattern: 'glob:**/*.parquet'
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentTarPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentTarPushJobSpec.yaml
similarity index 78%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentTarPushJobSpec.yaml
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentTarPushJobSpec.yaml
index d0b0ab4..aaa3741 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentTarPushJobSpec.yaml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentTarPushJobSpec.yaml
@@ -17,6 +17,11 @@
 # under the License.
 #
 
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner'
 jobType: SegmentTarPush
 inputDirURI: 'file:///path/to/input'
 includeFileNamePattern: 'glob:**/*.parquet'
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentUriPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentUriPushJobSpec.yaml
similarity index 78%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentUriPushJobSpec.yaml
rename to 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentUriPushJobSpec.yaml
index 4741966..d25d1d4 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentUriPushJobSpec.yaml
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentUriPushJobSpec.yaml
@@ -17,6 +17,11 @@
 # under the License.
 #
 
+executionFrameworkSpec:
+  name: 'standalone'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner'
 jobType: SegmentUriPush
 inputDirURI: 'file:///path/to/input'
 includeFileNamePattern: 'glob:**/*.parquet'
diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pom.xml
index c70b432..177cd39 100644
--- a/pinot-plugins/pinot-batch-ingestion/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/pom.xml
@@ -38,7 +38,7 @@
   </properties>
 
   <modules>
-    <module>pinot-batch-ingestion-base</module>
+    <module>pinot-batch-ingestion-standalone</module>
     <module>v0_deprecated</module>
   </modules>
 
diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml
index aab8027..0f46775 100644
--- a/pinot-spi/pom.xml
+++ b/pinot-spi/pom.xml
@@ -115,6 +115,10 @@
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.yaml</groupId>
+      <artifactId>snakeyaml</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/IngestionJobLauncher.java
similarity index 61%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/IngestionJobLauncher.java
index d943a2c..4211736 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/IngestionJobLauncher.java
@@ -16,22 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.standalone;
+package org.apache.pinot.spi.ingestion;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.Reader;
 import java.io.StringWriter;
 import java.util.Arrays;
-import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.spec.ExecutionFrameworkSpec;
+import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
 
-public class StandaloneIngestionJobLauncher {
+public class IngestionJobLauncher {
 
-  public static final Logger LOGGER = 
LoggerFactory.getLogger(StandaloneIngestionJobLauncher.class);
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(IngestionJobLauncher.class);
 
   private static final String USAGE = "usage: [jobSpec.yaml]";
 
@@ -53,24 +56,26 @@ public class StandaloneIngestionJobLauncher {
       StringWriter sw = new StringWriter();
       yaml.dump(spec, sw);
       LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString());
+
+      ExecutionFrameworkSpec executionFramework = 
spec.getExecutionFrameworkSpec();
       PinotIngestionJobType jobType = 
PinotIngestionJobType.valueOf(spec.getJobType());
       switch (jobType) {
         case SegmentCreation:
-          new SegmentGenerationJobRunner(spec).run();
+          kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
           break;
         case SegmentTarPush:
-          new SegmentTarPushJobRunner(spec).run();
+          kickoffIngestionJob(spec, 
executionFramework.getSegmentTarPushJobRunnerClassName());
           break;
         case SegmentUriPush:
-          new SegmentUriPushJobRunner(spec).run();
+          kickoffIngestionJob(spec, 
executionFramework.getSegmentUriPushJobRunnerClassName());
           break;
         case SegmentCreationAndTarPush:
-          new SegmentGenerationJobRunner(spec).run();
-          new SegmentTarPushJobRunner(spec).run();
+          kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
+          kickoffIngestionJob(spec, 
executionFramework.getSegmentTarPushJobRunnerClassName());
           break;
         case SegmentCreationAndUriPush:
-          new SegmentGenerationJobRunner(spec).run();
-          new SegmentUriPushJobRunner(spec).run();
+          kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
+          kickoffIngestionJob(spec, 
executionFramework.getSegmentUriPushJobRunnerClassName());
           break;
         default:
           LOGGER.error("Unsupported job type - {}. Support job types: {}", 
spec.getJobType(),
@@ -80,7 +85,19 @@ public class StandaloneIngestionJobLauncher {
     }
   }
 
+  private static void kickoffIngestionJob(SegmentGenerationJobSpec spec, 
String ingestionJobRunnerClassName)
+      throws Exception {
+    IngestionJobRunner ingestionJobRunner =
+        PluginManager.get().createInstance(ingestionJobRunnerClassName);
+    ingestionJobRunner.init(spec);
+    ingestionJobRunner.run();
+  }
+
   enum PinotIngestionJobType {
     SegmentCreation, SegmentTarPush, SegmentUriPush, 
SegmentCreationAndTarPush, SegmentCreationAndUriPush,
   }
+
+  enum PinotIngestionExecutionFramework {
+    Standalone, Hadoop, Spark
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/runner/IngestionJobRunner.java
similarity index 77%
copy from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java
copy to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/runner/IngestionJobRunner.java
index e653771..a851a4b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/runner/IngestionJobRunner.java
@@ -16,11 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.runner;
 
-public class Constants {
-  /**
-   * By default Pinot segments are compressed in 'tar.gz' format then pushed 
to controller.
-   */
-  public static final String TAR_GZ_FILE_EXT = ".tar.gz";
+import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec;
+
+
+public interface IngestionJobRunner {
+
+  void init(SegmentGenerationJobSpec jobSpec);
+
+  void run()
+      throws Exception;
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/Constants.java
similarity index 95%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/Constants.java
index e653771..1eb687a 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/Constants.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 public class Constants {
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/ExecutionFrameworkSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/ExecutionFrameworkSpec.java
new file mode 100644
index 0000000..66cd193
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/ExecutionFrameworkSpec.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.ingestion.spec;
+
+/**
+ * ExecutionFrameworkSpec defines which ingestion jobs to be running.
+ */
+public class ExecutionFrameworkSpec {
+  /**
+   * The name of the execution framework, currently supports: Standalone.
+   */
+  private String _name;
+
+  /**
+   * The class implements 
org.apache.pinot.spi.ingestion.runner.SegmentGenerationJobRunner interface.
+   */
+  private String _segmentGenerationJobRunnerClassName;
+
+  /**
+   * The class implements 
org.apache.pinot.spi.ingestion.runner.SegmentTarPushJobRunner interface.
+   */
+  private String _segmentTarPushJobRunnerClassName;
+
+  /**
+   * The class implements 
org.apache.pinot.spi.ingestion.runner.SegmentUriPushJobRunner interface.
+   */
+  private String _segmentUriPushJobRunnerClassName;
+
+  public String getName() {
+    return _name;
+  }
+
+  public void setName(String name) {
+    _name = name;
+  }
+
+  public String getSegmentGenerationJobRunnerClassName() {
+    return _segmentGenerationJobRunnerClassName;
+  }
+
+  public void setSegmentGenerationJobRunnerClassName(String 
segmentGenerationJobRunnerClassName) {
+    _segmentGenerationJobRunnerClassName = segmentGenerationJobRunnerClassName;
+  }
+
+  public String getSegmentTarPushJobRunnerClassName() {
+    return _segmentTarPushJobRunnerClassName;
+  }
+
+  public void setSegmentTarPushJobRunnerClassName(String 
segmentTarPushJobRunnerClassName) {
+    _segmentTarPushJobRunnerClassName = segmentTarPushJobRunnerClassName;
+  }
+
+  public String getSegmentUriPushJobRunnerClassName() {
+    return _segmentUriPushJobRunnerClassName;
+  }
+
+  public void setSegmentUriPushJobRunnerClassName(String 
segmentUriPushJobRunnerClassName) {
+    _segmentUriPushJobRunnerClassName = segmentUriPushJobRunnerClassName;
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotClusterSpec.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotClusterSpec.java
index 42c420b..4c41fad 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotClusterSpec.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 /**
  * PinotClusterSpec defines the Pinot Cluster Access Point.
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotFSSpec.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotFSSpec.java
index b6157d5..28b1e51 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotFSSpec.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 import java.util.Map;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PushJobSpec.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PushJobSpec.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PushJobSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PushJobSpec.java
index 7bd26c3..31631e5 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PushJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PushJobSpec.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 /**
  * PushJobSpec defines segment push job related configuration
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/RecordReaderSpec.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/RecordReaderSpec.java
index 2eac0b0..1acb79a 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/RecordReaderSpec.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 import java.util.Map;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationJobSpec.java
similarity index 92%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationJobSpec.java
index 1dd4f8a..74f8214 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationJobSpec.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 import java.util.List;
 
@@ -28,6 +28,11 @@ import java.util.List;
 public class SegmentGenerationJobSpec {
 
   /**
+   * Execution framework which this job will be running.
+   */
+  private ExecutionFrameworkSpec _executionFrameworkSpec;
+
+  /**
    * Supported job types are:
    *  'SegmentCreation'
    *  'SegmentTarPush'
@@ -35,7 +40,7 @@ public class SegmentGenerationJobSpec {
    *  'SegmentCreationAndTarPush'
    *  'SegmentCreationAndUriPush'
    */
-  private String jobType;
+  private String _jobType;
 
   /**
    * Root directory of input data, expected to have scheme configured in 
PinotFS.
@@ -98,8 +103,16 @@ public class SegmentGenerationJobSpec {
    */
   private PushJobSpec _pushJobSpec;
 
+  public ExecutionFrameworkSpec getExecutionFrameworkSpec() {
+    return _executionFrameworkSpec;
+  }
+
+  public void setExecutionFrameworkSpec(ExecutionFrameworkSpec 
executionFrameworkSpec) {
+    _executionFrameworkSpec = executionFrameworkSpec;
+  }
+
   public String getJobType() {
-    return jobType;
+    return _jobType;
   }
 
   /**
@@ -112,7 +125,7 @@ public class SegmentGenerationJobSpec {
    * @param jobType
    */
   public void setJobType(String jobType) {
-    this.jobType = jobType;
+    _jobType = jobType;
   }
 
   public String getInputDirURI() {
@@ -218,6 +231,7 @@ public class SegmentGenerationJobSpec {
   public void setPushJobSpec(PushJobSpec pushJobSpec) {
     _pushJobSpec = pushJobSpec;
   }
+
 }
 
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationTaskSpec.java
similarity index 92%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationTaskSpec.java
index ca85925..8652304 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationTaskSpec.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
-import org.apache.pinot.common.config.TableConfig;
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -31,7 +31,7 @@ public class SegmentGenerationTaskSpec {
   /**
    * Table config to create segment
    */
-  private TableConfig _tableConfig;
+  private JsonNode _tableConfig;
 
   /**
    * Table schema
@@ -63,11 +63,11 @@ public class SegmentGenerationTaskSpec {
    */
   private int _sequenceId;
 
-  public TableConfig getTableConfig() {
+  public JsonNode getTableConfig() {
     return _tableConfig;
   }
 
-  public void setTableConfig(TableConfig tableConfig) {
+  public void setTableConfig(JsonNode tableConfig) {
     _tableConfig = tableConfig;
   }
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentNameGeneratorSpec.java
similarity index 97%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentNameGeneratorSpec.java
index c3b0893..860e55f 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentNameGeneratorSpec.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java
 b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/TableSpec.java
similarity index 97%
rename from 
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java
rename to 
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/TableSpec.java
index d9a7994..418391d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/TableSpec.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.ingestion.common;
+package org.apache.pinot.spi.ingestion.spec;
 
 /**
  * TableSpec defines table name and where to fetch corresponding table config 
and table schema.
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 8faca9a..617c5ad 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -66,7 +66,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-batch-ingestion-base</artifactId>
+      <artifactId>pinot-batch-ingestion-standalone</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
@@ -293,7 +293,7 @@
               </jvmSettings>
             </program>
             <program>
-              
<mainClass>org.apache.pinot.ingestion.standalone.StandaloneIngestionJobLauncher</mainClass>
+              
<mainClass>org.apache.pinot.spi.ingestion.IngestionJobLauncher</mainClass>
               <name>pinot-ingestion-job</name>
               <jvmSettings>
                 <initialMemorySize>1G</initialMemorySize>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
index 004513f..c139fe2 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pinot.tools.admin.command;
 
-import org.apache.pinot.ingestion.standalone.StandaloneIngestionJobLauncher;
-import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.ingestion.IngestionJobLauncher;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -48,7 +47,7 @@ public class LaunchDataIngestionJobCommand extends 
AbstractBaseAdminCommand impl
   public boolean execute()
       throws Exception {
     try {
-      StandaloneIngestionJobLauncher.main(new String[]{_jobSpecFile});
+      IngestionJobLauncher.main(new String[]{_jobSpecFile});
     } catch (Exception e) {
       LOGGER.error("Got exception to kick off standalone data ingestion job 
-", e);
       throw e;
diff --git 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
index 463531d..19fc29e 100644
--- 
a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
+++ 
b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml
@@ -17,6 +17,21 @@
 # under the License.
 #
 
+# executionFrameworkSpec: Defines ingestion jobs to be running.
+executionFrameworkSpec:
+
+  # name: execution framework name
+  name: 'standalone'
+
+  # segmentGenerationJobRunnerClassName: class name implements 
org.apache.pinot.spi.ingestion.runner.SegmentGenerationJobRunner interface.
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner'
+
+  # segmentTarPushJobRunnerClassName: class name implements 
org.apache.pinot.spi.ingestion.runner.SegmentTarPushJobRunner interface.
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner'
+
+  # segmentUriPushJobRunnerClassName: class name implements 
org.apache.pinot.spi.ingestion.runner.SegmentUriPushJobRunner interface.
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner'
+
 # jobType: Pinot ingestion job type.
 # Supported job types are:
 #   'SegmentCreation'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to