This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch make-quickstart-working in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ab0808277db608aa4b011147e5eccf817376607e Author: Xiang Fu <[email protected]> AuthorDate: Wed Jan 15 01:18:59 2020 -0800 Make quickstart working with IDE --- pinot-distribution/pom.xml | 4 ++ .../spi/ingestion/batch/IngestionJobLauncher.java | 66 +++++++++++----------- pinot-tools/pom.xml | 6 ++ .../tools/admin/command/QuickstartRunner.java | 18 +++++- 4 files changed, 60 insertions(+), 34 deletions(-) diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml index 5c3bd85..7910663 100644 --- a/pinot-distribution/pom.xml +++ b/pinot-distribution/pom.xml @@ -84,6 +84,10 @@ <artifactId>pinot-kafka-${kafka.version}</artifactId> </exclusion> <exclusion> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-batch-ingestion-standalone</artifactId> + </exclusion> + <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java index 43bcf9e..2a615cf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java @@ -51,44 +51,46 @@ public class IngestionJobLauncher { String jobSpecFilePath = args[0]; try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) { - Yaml yaml = new Yaml(); - SegmentGenerationJobSpec spec = yaml.loadAs(reader, SegmentGenerationJobSpec.class); - StringWriter sw = new StringWriter(); - yaml.dump(spec, sw); - LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString()); + SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class); + runIngestionJob(spec); + } + } - ExecutionFrameworkSpec executionFramework = spec.getExecutionFrameworkSpec(); - PinotIngestionJobType jobType = PinotIngestionJobType.valueOf(spec.getJobType()); - switch (jobType) { - case SegmentCreation: - kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); - break; - case SegmentTarPush: - kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName()); - break; - case SegmentUriPush: - kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName()); - break; - case SegmentCreationAndTarPush: - kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); - kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName()); - break; - case SegmentCreationAndUriPush: - kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); - kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName()); - break; - default: - LOGGER.error("Unsupported job type - {}. Support job types: {}", spec.getJobType(), - Arrays.toString(PinotIngestionJobType.values())); - throw new RuntimeException("Unsupported job type - " + spec.getJobType()); - } + public static void runIngestionJob(SegmentGenerationJobSpec spec) + throws Exception { + StringWriter sw = new StringWriter(); + new Yaml().dump(spec, sw); + LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString()); + ExecutionFrameworkSpec executionFramework = spec.getExecutionFrameworkSpec(); + PinotIngestionJobType jobType = PinotIngestionJobType.valueOf(spec.getJobType()); + switch (jobType) { + case SegmentCreation: + kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); + break; + case SegmentTarPush: + kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName()); + break; + case SegmentUriPush: + kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName()); + break; + case SegmentCreationAndTarPush: + kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); + kickoffIngestionJob(spec, executionFramework.getSegmentTarPushJobRunnerClassName()); + break; + case SegmentCreationAndUriPush: + kickoffIngestionJob(spec, executionFramework.getSegmentGenerationJobRunnerClassName()); + kickoffIngestionJob(spec, executionFramework.getSegmentUriPushJobRunnerClassName()); + break; + default: + LOGGER.error("Unsupported job type - {}. Support job types: {}", spec.getJobType(), + Arrays.toString(PinotIngestionJobType.values())); + throw new RuntimeException("Unsupported job type - " + spec.getJobType()); } } private static void kickoffIngestionJob(SegmentGenerationJobSpec spec, String ingestionJobRunnerClassName) throws Exception { - IngestionJobRunner ingestionJobRunner = - PluginManager.get().createInstance(ingestionJobRunnerClassName); + IngestionJobRunner ingestionJobRunner = PluginManager.get().createInstance(ingestionJobRunnerClassName); ingestionJobRunner.init(spec); ingestionJobRunner.run(); } diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml index ea336ad..275e1b2 100644 --- a/pinot-tools/pom.xml +++ b/pinot-tools/pom.xml @@ -71,6 +71,12 @@ <scope>runtime</scope> </dependency> <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-batch-ingestion-standalone</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index 4930a14..bdbbaa6 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -19,17 +19,23 @@ package org.apache.pinot.tools.admin.command; import com.fasterxml.jackson.databind.JsonNode; +import java.io.BufferedReader; import java.io.File; +import java.io.FileReader; import java.io.IOException; +import java.io.Reader; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; +import org.apache.pinot.common.utils.TenantRole; import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher; +import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.common.utils.TenantRole; import org.apache.pinot.tools.QuickstartTableRequest; +import org.yaml.snakeyaml.Yaml; public class QuickstartRunner { @@ -167,7 +173,15 @@ public class QuickstartRunner { throws Exception { for (QuickstartTableRequest request : _tableRequests) { if (request.getTableType() == TableType.OFFLINE) { - IngestionJobLauncher.main(new String[]{request.getIngestionJobFile().getAbsolutePath()}); + try (Reader reader = new BufferedReader(new FileReader(request.getIngestionJobFile().getAbsolutePath()))) { + SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, SegmentGenerationJobSpec.class); + String inputDirURI = spec.getInputDirURI(); + if (!new File(inputDirURI).exists()) { + URL resolvedInputDirURI = QuickstartRunner.class.getClassLoader().getResource(inputDirURI); + spec.setInputDirURI(resolvedInputDirURI.toURI().toString()); + } + IngestionJobLauncher.runIngestionJob(spec); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
