This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch ingestion_into_spi in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 843eb686a215971bfbc8470331e5170b0e5be5da Author: Xiang Fu <[email protected]> AuthorDate: Wed Jan 8 17:05:59 2020 -0800 Refactor pinot-batch-ingestion job specs to pinot-spi --- docs/batch_data_ingestion.rst | 15 ++++ pinot-distribution/pinot-assembly.xml | 1 - .../pom.xml | 17 ++--- .../standalone/SegmentGenerationJobRunner.java | 87 ++++++++++++---------- .../standalone}/SegmentGenerationTaskRunner.java | 17 +++-- .../standalone/SegmentTarPushJobRunner.java | 26 +++++-- .../standalone/SegmentUriPushJobRunner.java | 26 +++++-- .../segmentCreationAndTarPushJobSpec.yaml | 5 ++ .../segmentCreationAndUriPushJobSpec.yaml | 5 ++ .../src/main/resources/segmentCreationJobSpec.yaml | 5 ++ .../src/main/resources/segmentTarPushJobSpec.yaml | 5 ++ .../src/main/resources/segmentUriPushJobSpec.yaml | 5 ++ pinot-plugins/pinot-batch-ingestion/pom.xml | 2 +- pinot-spi/pom.xml | 4 + .../pinot/spi/ingestion/IngestionJobLauncher.java | 39 +++++++--- .../spi/ingestion/runner/IngestionJobRunner.java | 16 ++-- .../pinot/spi/ingestion/spec}/Constants.java | 2 +- .../spi/ingestion/spec/ExecutionFrameworkSpec.java | 76 +++++++++++++++++++ .../spi/ingestion/spec}/PinotClusterSpec.java | 2 +- .../pinot/spi/ingestion/spec}/PinotFSSpec.java | 2 +- .../pinot/spi/ingestion/spec}/PushJobSpec.java | 2 +- .../spi/ingestion/spec}/RecordReaderSpec.java | 2 +- .../ingestion/spec}/SegmentGenerationJobSpec.java | 22 +++++- .../ingestion/spec}/SegmentGenerationTaskSpec.java | 10 +-- .../ingestion/spec}/SegmentNameGeneratorSpec.java | 2 +- .../pinot/spi/ingestion/spec}/TableSpec.java | 2 +- pinot-tools/pom.xml | 4 +- .../command/LaunchDataIngestionJobCommand.java | 5 +- .../batch/airlineStats/ingestionJobSpec.yaml | 15 ++++ 29 files changed, 308 insertions(+), 113 deletions(-) diff --git a/docs/batch_data_ingestion.rst b/docs/batch_data_ingestion.rst index 033c4b2..2e995a7 100644 --- a/docs/batch_data_ingestion.rst +++ b/docs/batch_data_ingestion.rst @@ -46,6 +46,21 @@ Below is an example (also located at `examples/batch/airlineStats/ingestionJobSp .. code-block:: none +# executionFrameworkSpec: Defines ingestion jobs to be running. +executionFrameworkSpec: + + # name: execution framework name + name: 'standalone' + + # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.runner.SegmentGenerationJobRunner interface. + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner' + + # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.runner.SegmentTarPushJobRunner interface. + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner' + + # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.runner.SegmentUriPushJobRunner interface. + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner' + # jobType: Pinot ingestion job type. # Supported job types are: # 'SegmentCreation' diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml index a410657..a95e93b 100644 --- a/pinot-distribution/pinot-assembly.xml +++ b/pinot-distribution/pinot-assembly.xml @@ -119,7 +119,6 @@ <exclude>**/pinot-stream-ingestion/pinot-stream-ingestion/**</exclude> <exclude>**/pinot-stream-ingestion/pinot-kafka-base/**</exclude> <exclude>**/pinot-batch-ingestion/pinot-batch-ingestion/**</exclude> - <exclude>**/pinot-batch-ingestion/pinot-batch-ingestion-base/**</exclude> <exclude>**/pinot-batch-ingestion/pinot-ingestion-common/**</exclude> <exclude>**/pinot-batch-ingestion/v0_deprecated/**</exclude> </excludes> diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/pom.xml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/pom.xml similarity index 91% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/pom.xml rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/pom.xml index 80c06f8..449571a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/pom.xml @@ -29,8 +29,8 @@ <relativePath>..</relativePath> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>pinot-batch-ingestion-base</artifactId> - <name>Pinot Batch Ingestion Base</name> + <artifactId>pinot-batch-ingestion-standalone</artifactId> + <name>Pinot Batch Ingestion Standalone</name> <url>https://pinot.apache.org/</url> <properties> <pinot.root>${basedir}/../../..</pinot.root> @@ -38,25 +38,18 @@ <dependencies> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-common</artifactId> + <artifactId>pinot-spi</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-core</artifactId> + <artifactId>pinot-common</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-parquet</artifactId> + <artifactId>pinot-core</artifactId> <version>${project.version}</version> - <scope>shaded</scope> - <exclusions> - <exclusion> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-core</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationJobRunner.java similarity index 94% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationJobRunner.java index 1df2e12..fcfd963 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationJobRunner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.standalone; +package org.apache.pinot.plugin.ingestion.standalone; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -36,31 +36,66 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.pinot.ingestion.common.Constants; -import org.apache.pinot.ingestion.common.SegmentGenerationTaskRunner; -import org.apache.pinot.ingestion.common.SegmentGenerationTaskSpec; -import org.apache.pinot.ingestion.common.PinotClusterSpec; -import org.apache.pinot.ingestion.common.PinotFSSpec; -import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner; +import org.apache.pinot.spi.ingestion.spec.Constants; +import org.apache.pinot.spi.ingestion.spec.PinotClusterSpec; +import org.apache.pinot.spi.ingestion.spec.PinotFSSpec; +import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.ingestion.spec.SegmentGenerationTaskSpec; import org.apache.pinot.spi.utils.DataSize; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentGenerationJobRunner { +public class SegmentGenerationJobRunner implements IngestionJobRunner { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobRunner.class); private static final String OFFLINE = "OFFLINE"; - private static final String LOCAL_PINOT_FS_SCHEME = "file"; private SegmentGenerationJobSpec _spec; + public SegmentGenerationJobRunner() { + } + public SegmentGenerationJobRunner(SegmentGenerationJobSpec spec) { + init(spec); + } + + private static String generateSchemaURI(String controllerUri, String table) { + return String.format("%s/tables/%s/schema", controllerUri, table); + } + + private static String generateTableConfigURI(String controllerUri, String table) { + return String.format("%s/tables/%s", controllerUri, table); + } + + /** + * Generate a relative output directory path when `useRelativePath` flag is on. + * This method will compute the relative path based on `inputFile` and `baseInputDir`, + * then apply only the directory part of relative path to `outputDir`. + * E.g. + * baseInputDir = "/path/to/input" + * inputFile = "/path/to/input/a/b/c/d.avro" + * outputDir = "/path/to/output" + * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c + */ + public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) { + URI relativePath = baseInputDir.relativize(inputFile); + Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), + "Unable to extract out the relative path based on base input path: " + baseInputDir); + String outputDirStr = outputDir.toString(); + outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir; + URI relativeOutputURI = outputDir.resolve(relativePath).resolve("."); + return relativeOutputURI; + } + + @Override + public void init(SegmentGenerationJobSpec spec) { _spec = spec; if (_spec.getInputDirURI() == null) { throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' file"); @@ -96,34 +131,7 @@ public class SegmentGenerationJobRunner { } } - private static String generateSchemaURI(String controllerUri, String table) { - return String.format("%s/tables/%s/schema", controllerUri, table); - } - - private static String generateTableConfigURI(String controllerUri, String table) { - return String.format("%s/tables/%s", controllerUri, table); - } - - /** - * Generate a relative output directory path when `useRelativePath` flag is on. - * This method will compute the relative path based on `inputFile` and `baseInputDir`, - * then apply only the directory part of relative path to `outputDir`. - * E.g. - * baseInputDir = "/path/to/input" - * inputFile = "/path/to/input/a/b/c/d.avro" - * outputDir = "/path/to/output" - * getRelativeOutputPath(baseInputDir, inputFile, outputDir) = /path/to/output/a/b/c - */ - public static URI getRelativeOutputPath(URI baseInputDir, URI inputFile, URI outputDir) { - URI relativePath = baseInputDir.relativize(inputFile); - Preconditions.checkState(relativePath.getPath().length() > 0 && !relativePath.equals(inputFile), - "Unable to extract out the relative path based on base input path: " + baseInputDir); - String outputDirStr = outputDir.toString(); - outputDir = !outputDirStr.endsWith("/") ? URI.create(outputDirStr.concat("/")) : outputDir; - URI relativeOutputURI = outputDir.resolve(relativePath).resolve("."); - return relativeOutputURI; - } - + @Override public void run() throws Exception { //init all file systems @@ -194,7 +202,8 @@ public class SegmentGenerationJobRunner { for (int i = 0; i < filteredFiles.size(); i++) { URI inputFileURI = URI.create(filteredFiles.get(i)); if (inputFileURI.getScheme() == null) { - inputFileURI = new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); + inputFileURI = + new URI(inputDirURI.getScheme(), inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment()); } //copy input path to local File localInputDataFile = new File(localInputTempDir, new File(inputFileURI).getName()); @@ -206,7 +215,7 @@ public class SegmentGenerationJobRunner { taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath()); taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec()); taskSpec.setSchema(schema); - taskSpec.setTableConfig(tableConfig); + taskSpec.setTableConfig(tableConfig.toJsonNode()); taskSpec.setSequenceId(i); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationTaskRunner.java similarity index 91% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationTaskRunner.java index 49e2f4e..fc5338b 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentGenerationTaskRunner.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.plugin.ingestion.standalone; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import java.io.File; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig; @@ -31,6 +32,8 @@ import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator; import org.apache.pinot.core.segment.name.SegmentNameGenerator; import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; +import org.apache.pinot.spi.ingestion.spec.SegmentGenerationTaskSpec; +import org.apache.pinot.spi.ingestion.spec.SegmentNameGeneratorSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.spi.data.readers.RecordReader; @@ -59,8 +62,8 @@ public class SegmentGenerationTaskRunner { public String run() throws Exception { - String tableName = _taskSpec.getTableConfig().getTableName(); - TableConfig tableConfig = _taskSpec.getTableConfig(); + TableConfig tableConfig = TableConfig.fromJsonConfig(_taskSpec.getTableConfig()); + String tableName = tableConfig.getTableName(); Schema schema = _taskSpec.getSchema(); //init record reader config @@ -100,9 +103,11 @@ public class SegmentGenerationTaskRunner { return segmentIndexCreationDriver.getSegmentName(); } - private SegmentNameGenerator getSegmentNameGerator() { - String tableName = _taskSpec.getTableConfig().getTableName(); - TableConfig tableConfig = _taskSpec.getTableConfig(); + private SegmentNameGenerator getSegmentNameGerator() + throws IOException { + TableConfig tableConfig = TableConfig.fromJsonConfig(_taskSpec.getTableConfig()); + String tableName = tableConfig.getTableName(); + Schema schema = _taskSpec.getSchema(); SegmentNameGeneratorSpec segmentNameGeneratorSpec = _taskSpec.getSegmentNameGeneratorSpec(); if (segmentNameGeneratorSpec == null) { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentTarPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentTarPushJobRunner.java similarity index 91% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentTarPushJobRunner.java rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentTarPushJobRunner.java index 11a9997..cc58ee2 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentTarPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentTarPushJobRunner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.standalone; +package org.apache.pinot.plugin.ingestion.standalone; import com.google.common.base.Preconditions; import java.io.File; @@ -29,32 +29,42 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; -import org.apache.pinot.ingestion.common.Constants; -import org.apache.pinot.ingestion.common.PinotClusterSpec; -import org.apache.pinot.ingestion.common.PinotFSSpec; -import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner; +import org.apache.pinot.spi.ingestion.spec.Constants; +import org.apache.pinot.spi.ingestion.spec.PinotClusterSpec; +import org.apache.pinot.spi.ingestion.spec.PinotFSSpec; +import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.apache.pinot.spi.utils.retry.RetriableOperationException; import org.apache.pinot.spi.utils.retry.RetryPolicies; -import org.apache.pinot.spi.filesystem.PinotFSFactory; -import org.apache.pinot.spi.filesystem.PinotFS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentTarPushJobRunner { +public class SegmentTarPushJobRunner implements IngestionJobRunner { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentTarPushJobRunner.class); private SegmentGenerationJobSpec _spec; + public SegmentTarPushJobRunner() { + } + public SegmentTarPushJobRunner(SegmentGenerationJobSpec spec) { + init(spec); + } + + @Override + public void init(SegmentGenerationJobSpec spec) { _spec = spec; } + @Override public void run() { //init all file systems List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentUriPushJobRunner.java similarity index 91% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentUriPushJobRunner.java rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentUriPushJobRunner.java index 3bf7ed7..be54197 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/SegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/standalone/SegmentUriPushJobRunner.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.standalone; +package org.apache.pinot.plugin.ingestion.standalone; import java.io.File; import java.io.IOException; @@ -27,35 +27,45 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.MapConfiguration; -import org.apache.pinot.ingestion.common.Constants; -import org.apache.pinot.ingestion.common.PinotClusterSpec; -import org.apache.pinot.ingestion.common.PinotFSSpec; -import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner; +import org.apache.pinot.spi.ingestion.spec.Constants; +import org.apache.pinot.spi.ingestion.spec.PinotClusterSpec; +import org.apache.pinot.spi.ingestion.spec.PinotFSSpec; +import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.apache.pinot.spi.utils.retry.RetriableOperationException; import org.apache.pinot.spi.utils.retry.RetryPolicies; -import org.apache.pinot.spi.filesystem.PinotFSFactory; -import org.apache.pinot.spi.filesystem.PinotFS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentUriPushJobRunner { +public class SegmentUriPushJobRunner implements IngestionJobRunner { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUriPushJobRunner.class); private SegmentGenerationJobSpec _spec; + public SegmentUriPushJobRunner() { + } + public SegmentUriPushJobRunner(SegmentGenerationJobSpec spec) { + init(spec); + } + + @Override + public void init(SegmentGenerationJobSpec spec) { _spec = spec; if (_spec.getPushJobSpec() == null) { throw new RuntimeException("Missing PushJobSpec"); } } + @Override public void run() { //init all file systems List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndTarPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml similarity index 78% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndTarPushJobSpec.yaml rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml index 18504dd..44d414a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndTarPushJobSpec.yaml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml @@ -17,6 +17,11 @@ # under the License. # +executionFrameworkSpec: + name: 'standalone' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner' jobType: SegmentCreationAndTarPush inputDirURI: 'file:///path/to/input' includeFileNamePattern: 'glob:**/*.parquet' diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndUriPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndUriPushJobSpec.yaml similarity index 78% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndUriPushJobSpec.yaml rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndUriPushJobSpec.yaml index 56b1599..dc7d8da 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationAndUriPushJobSpec.yaml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndUriPushJobSpec.yaml @@ -17,6 +17,11 @@ # under the License. # +executionFrameworkSpec: + name: 'standalone' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner' jobType: SegmentCreationAndUriPush inputDirURI: 'file:///path/to/input' includeFileNamePattern: 'glob:**/*.parquet' diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationJobSpec.yaml similarity index 79% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationJobSpec.yaml rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationJobSpec.yaml index b4c0c74..1752913 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentCreationJobSpec.yaml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationJobSpec.yaml @@ -17,6 +17,11 @@ # under the License. # +executionFrameworkSpec: + name: 'standalone' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner' jobType: SegmentCreation inputDirURI: 'file:///path/to/input' includeFileNamePattern: 'glob:**/*.parquet' diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentTarPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentTarPushJobSpec.yaml similarity index 78% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentTarPushJobSpec.yaml rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentTarPushJobSpec.yaml index d0b0ab4..aaa3741 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentTarPushJobSpec.yaml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentTarPushJobSpec.yaml @@ -17,6 +17,11 @@ # under the License. # +executionFrameworkSpec: + name: 'standalone' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner' jobType: SegmentTarPush inputDirURI: 'file:///path/to/input' includeFileNamePattern: 'glob:**/*.parquet' diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentUriPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentUriPushJobSpec.yaml similarity index 78% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentUriPushJobSpec.yaml rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentUriPushJobSpec.yaml index 4741966..d25d1d4 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/resources/segmentUriPushJobSpec.yaml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentUriPushJobSpec.yaml @@ -17,6 +17,11 @@ # under the License. # +executionFrameworkSpec: + name: 'standalone' + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner' + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner' + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner' jobType: SegmentUriPush inputDirURI: 'file:///path/to/input' includeFileNamePattern: 'glob:**/*.parquet' diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml b/pinot-plugins/pinot-batch-ingestion/pom.xml index c70b432..177cd39 100644 --- a/pinot-plugins/pinot-batch-ingestion/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/pom.xml @@ -38,7 +38,7 @@ </properties> <modules> - <module>pinot-batch-ingestion-base</module> + <module>pinot-batch-ingestion-standalone</module> <module>v0_deprecated</module> </modules> diff --git a/pinot-spi/pom.xml b/pinot-spi/pom.xml index aab8027..0f46775 100644 --- a/pinot-spi/pom.xml +++ b/pinot-spi/pom.xml @@ -115,6 +115,10 @@ <artifactId>jackson-annotations</artifactId> </dependency> <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/IngestionJobLauncher.java similarity index 61% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/IngestionJobLauncher.java index d943a2c..4211736 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/IngestionJobLauncher.java @@ -16,22 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.standalone; +package org.apache.pinot.spi.ingestion; import java.io.BufferedReader; import java.io.FileReader; import java.io.Reader; import java.io.StringWriter; import java.util.Arrays; -import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec; +import org.apache.pinot.spi.ingestion.runner.IngestionJobRunner; +import org.apache.pinot.spi.ingestion.spec.ExecutionFrameworkSpec; +import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.plugin.PluginManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; -public class StandaloneIngestionJobLauncher { +public class IngestionJobLauncher { - public static final Logger LOGGER = LoggerFactory.getLogger(StandaloneIngestionJobLauncher.class); + public static final Logger LOGGER = LoggerFactory.getLogger(IngestionJobLauncher.class); private static final String USAGE = "usage: [jobSpec.yaml]"; @@ -53,24 +56,26 @@ public class StandaloneIngestionJobLauncher { StringWriter sw = new StringWriter(); yaml.dump(spec, sw); LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString()); + + ExecutionFrameworkSpec executionFramework = spec.getExecutionFrameworkSpec(); PinotIngestionJobType jobType = PinotIngestionJobType.valueOf(spec.getJobType()); switch (jobType) { case SegmentCreation: - new SegmentGenerationJobRunner(spec).run(); + kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); break; case SegmentTarPush: - new SegmentTarPushJobRunner(spec).run(); + kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName()); break; case SegmentUriPush: - new SegmentUriPushJobRunner(spec).run(); + kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName()); break; case SegmentCreationAndTarPush: - new SegmentGenerationJobRunner(spec).run(); - new SegmentTarPushJobRunner(spec).run(); + kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); + kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName()); break; case SegmentCreationAndUriPush: - new SegmentGenerationJobRunner(spec).run(); - new SegmentUriPushJobRunner(spec).run(); + kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); + kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName()); break; default: LOGGER.error("Unsupported job type - {}. Support job types: {}", spec.getJobType(), @@ -80,7 +85,19 @@ public class StandaloneIngestionJobLauncher { } } + private static void kickoffIngestionJob(SegmentGenerationJobSpec spec, String ingestionJobRunnerClassName) + throws Exception { + IngestionJobRunner ingestionJobRunner = + PluginManager.get().createInstance(ingestionJobRunnerClassName); + ingestionJobRunner.init(spec); + ingestionJobRunner.run(); + } + enum PinotIngestionJobType { SegmentCreation, SegmentTarPush, SegmentUriPush, SegmentCreationAndTarPush, SegmentCreationAndUriPush, } + + enum PinotIngestionExecutionFramework { + Standalone, Hadoop, Spark + } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/runner/IngestionJobRunner.java similarity index 77% copy from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java copy to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/runner/IngestionJobRunner.java index e653771..a851a4b 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/runner/IngestionJobRunner.java @@ -16,11 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.runner; -public class Constants { - /** - * By default Pinot segments are compressed in 'tar.gz' format then pushed to controller. - */ - public static final String TAR_GZ_FILE_EXT = ".tar.gz"; +import org.apache.pinot.spi.ingestion.spec.SegmentGenerationJobSpec; + + +public interface IngestionJobRunner { + + void init(SegmentGenerationJobSpec jobSpec); + + void run() + throws Exception; } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/Constants.java similarity index 95% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/Constants.java index e653771..1eb687a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/Constants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/Constants.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; public class Constants { /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/ExecutionFrameworkSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/ExecutionFrameworkSpec.java new file mode 100644 index 0000000..66cd193 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/ExecutionFrameworkSpec.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.ingestion.spec; + +/** + * ExecutionFrameworkSpec defines which ingestion jobs to be running. + */ +public class ExecutionFrameworkSpec { + /** + * The name of the execution framework, currently supports: Standalone. + */ + private String _name; + + /** + * The class implements org.apache.pinot.spi.ingestion.runner.SegmentGenerationJobRunner interface. + */ + private String _segmentGenerationJobRunnerClassName; + + /** + * The class implements org.apache.pinot.spi.ingestion.runner.SegmentTarPushJobRunner interface. + */ + private String _segmentTarPushJobRunnerClassName; + + /** + * The class implements org.apache.pinot.spi.ingestion.runner.SegmentUriPushJobRunner interface. + */ + private String _segmentUriPushJobRunnerClassName; + + public String getName() { + return _name; + } + + public void setName(String name) { + _name = name; + } + + public String getSegmentGenerationJobRunnerClassName() { + return _segmentGenerationJobRunnerClassName; + } + + public void setSegmentGenerationJobRunnerClassName(String segmentGenerationJobRunnerClassName) { + _segmentGenerationJobRunnerClassName = segmentGenerationJobRunnerClassName; + } + + public String getSegmentTarPushJobRunnerClassName() { + return _segmentTarPushJobRunnerClassName; + } + + public void setSegmentTarPushJobRunnerClassName(String segmentTarPushJobRunnerClassName) { + _segmentTarPushJobRunnerClassName = segmentTarPushJobRunnerClassName; + } + + public String getSegmentUriPushJobRunnerClassName() { + return _segmentUriPushJobRunnerClassName; + } + + public void setSegmentUriPushJobRunnerClassName(String segmentUriPushJobRunnerClassName) { + _segmentUriPushJobRunnerClassName = segmentUriPushJobRunnerClassName; + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotClusterSpec.java similarity index 96% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotClusterSpec.java index 42c420b..4c41fad 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotClusterSpec.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; /** * PinotClusterSpec defines the Pinot Cluster Access Point. diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotFSSpec.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotFSSpec.java index b6157d5..28b1e51 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PinotFSSpec.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; import java.util.Map; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PushJobSpec.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PushJobSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PushJobSpec.java index 7bd26c3..31631e5 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/PushJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/PushJobSpec.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; /** * PushJobSpec defines segment push job related configuration diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/RecordReaderSpec.java similarity index 98% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/RecordReaderSpec.java index 2eac0b0..1acb79a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/RecordReaderSpec.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; import java.util.Map; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationJobSpec.java similarity index 92% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationJobSpec.java index 1dd4f8a..74f8214 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationJobSpec.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; import java.util.List; @@ -28,6 +28,11 @@ import java.util.List; public class SegmentGenerationJobSpec { /** + * Execution framework which this job will be running. + */ + private ExecutionFrameworkSpec _executionFrameworkSpec; + + /** * Supported job types are: * 'SegmentCreation' * 'SegmentTarPush' @@ -35,7 +40,7 @@ public class SegmentGenerationJobSpec { * 'SegmentCreationAndTarPush' * 'SegmentCreationAndUriPush' */ - private String jobType; + private String _jobType; /** * Root directory of input data, expected to have scheme configured in PinotFS. @@ -98,8 +103,16 @@ public class SegmentGenerationJobSpec { */ private PushJobSpec _pushJobSpec; + public ExecutionFrameworkSpec getExecutionFrameworkSpec() { + return _executionFrameworkSpec; + } + + public void setExecutionFrameworkSpec(ExecutionFrameworkSpec executionFrameworkSpec) { + _executionFrameworkSpec = executionFrameworkSpec; + } + public String getJobType() { - return jobType; + return _jobType; } /** @@ -112,7 +125,7 @@ public class SegmentGenerationJobSpec { * @param jobType */ public void setJobType(String jobType) { - this.jobType = jobType; + _jobType = jobType; } public String getInputDirURI() { @@ -218,6 +231,7 @@ public class SegmentGenerationJobSpec { public void setPushJobSpec(PushJobSpec pushJobSpec) { _pushJobSpec = pushJobSpec; } + } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationTaskSpec.java similarity index 92% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationTaskSpec.java index ca85925..8652304 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentGenerationTaskSpec.java @@ -16,9 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; -import org.apache.pinot.common.config.TableConfig; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.pinot.spi.data.Schema; @@ -31,7 +31,7 @@ public class SegmentGenerationTaskSpec { /** * Table config to create segment */ - private TableConfig _tableConfig; + private JsonNode _tableConfig; /** * Table schema @@ -63,11 +63,11 @@ public class SegmentGenerationTaskSpec { */ private int _sequenceId; - public TableConfig getTableConfig() { + public JsonNode getTableConfig() { return _tableConfig; } - public void setTableConfig(TableConfig tableConfig) { + public void setTableConfig(JsonNode tableConfig) { _tableConfig = tableConfig; } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentNameGeneratorSpec.java similarity index 97% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentNameGeneratorSpec.java index c3b0893..860e55f 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/SegmentNameGeneratorSpec.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; import java.util.HashMap; import java.util.Map; diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/TableSpec.java similarity index 97% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/TableSpec.java index d9a7994..418391d 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-base/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/spec/TableSpec.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.ingestion.common; +package org.apache.pinot.spi.ingestion.spec; /** * TableSpec defines table name and where to fetch corresponding table config and table schema. diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml index 8faca9a..617c5ad 100644 --- a/pinot-tools/pom.xml +++ b/pinot-tools/pom.xml @@ -66,7 +66,7 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> - <artifactId>pinot-batch-ingestion-base</artifactId> + <artifactId>pinot-batch-ingestion-standalone</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -293,7 +293,7 @@ </jvmSettings> </program> <program> - <mainClass>org.apache.pinot.ingestion.standalone.StandaloneIngestionJobLauncher</mainClass> + <mainClass>org.apache.pinot.spi.ingestion.IngestionJobLauncher</mainClass> <name>pinot-ingestion-job</name> <jvmSettings> <initialMemorySize>1G</initialMemorySize> diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java index 004513f..c139fe2 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchDataIngestionJobCommand.java @@ -18,8 +18,7 @@ */ package org.apache.pinot.tools.admin.command; -import org.apache.pinot.ingestion.standalone.StandaloneIngestionJobLauncher; -import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.ingestion.IngestionJobLauncher; import org.apache.pinot.tools.Command; import org.kohsuke.args4j.Option; import org.slf4j.Logger; @@ -48,7 +47,7 @@ public class LaunchDataIngestionJobCommand extends AbstractBaseAdminCommand impl public boolean execute() throws Exception { try { - StandaloneIngestionJobLauncher.main(new String[]{_jobSpecFile}); + IngestionJobLauncher.main(new String[]{_jobSpecFile}); } catch (Exception e) { LOGGER.error("Got exception to kick off standalone data ingestion job -", e); throw e; diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml index 463531d..19fc29e 100644 --- a/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml +++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/ingestionJobSpec.yaml @@ -17,6 +17,21 @@ # under the License. # +# executionFrameworkSpec: Defines ingestion jobs to be running. +executionFrameworkSpec: + + # name: execution framework name + name: 'standalone' + + # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.runner.SegmentGenerationJobRunner interface. + segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentGenerationJobRunner' + + # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.runner.SegmentTarPushJobRunner interface. + segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentTarPushJobRunner' + + # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.ingestion.runner.SegmentUriPushJobRunner interface. + segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.standalone.SegmentUriPushJobRunner' + # jobType: Pinot ingestion job type. # Supported job types are: # 'SegmentCreation' --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
