This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch make-quickstart-working
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ab0808277db608aa4b011147e5eccf817376607e
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jan 15 01:18:59 2020 -0800

    Make quickstart working with IDE
---
 pinot-distribution/pom.xml                         |  4 ++
 .../spi/ingestion/batch/IngestionJobLauncher.java  | 66 +++++++++++-----------
 pinot-tools/pom.xml                                |  6 ++
 .../tools/admin/command/QuickstartRunner.java      | 18 +++++-
 4 files changed, 60 insertions(+), 34 deletions(-)

diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index 5c3bd85..7910663 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -84,6 +84,10 @@
           <artifactId>pinot-kafka-${kafka.version}</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.apache.pinot</groupId>
+          <artifactId>pinot-batch-ingestion-standalone</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-common</artifactId>
         </exclusion>
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
index 43bcf9e..2a615cf 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncher.java
@@ -51,44 +51,46 @@ public class IngestionJobLauncher {
     String jobSpecFilePath = args[0];
 
     try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
-      Yaml yaml = new Yaml();
-      SegmentGenerationJobSpec spec = yaml.loadAs(reader, 
SegmentGenerationJobSpec.class);
-      StringWriter sw = new StringWriter();
-      yaml.dump(spec, sw);
-      LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString());
+      SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, 
SegmentGenerationJobSpec.class);
+      runIngestionJob(spec);
+    }
+  }
 
-      ExecutionFrameworkSpec executionFramework = 
spec.getExecutionFrameworkSpec();
-      PinotIngestionJobType jobType = 
PinotIngestionJobType.valueOf(spec.getJobType());
-      switch (jobType) {
-        case SegmentCreation:
-          kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
-          break;
-        case SegmentTarPush:
-          kickoffIngestionJob(spec, 
executionFramework.getSegmentTarPushJobRunnerClassName());
-          break;
-        case SegmentUriPush:
-          kickoffIngestionJob(spec, 
executionFramework.getSegmentUriPushJobRunnerClassName());
-          break;
-        case SegmentCreationAndTarPush:
-          kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
-          kickoffIngestionJob(spec, 
executionFramework.getSegmentTarPushJobRunnerClassName());
-          break;
-        case SegmentCreationAndUriPush:
-          kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
-          kickoffIngestionJob(spec, 
executionFramework.getSegmentUriPushJobRunnerClassName());
-          break;
-        default:
-          LOGGER.error("Unsupported job type - {}. Support job types: {}", 
spec.getJobType(),
-              Arrays.toString(PinotIngestionJobType.values()));
-          throw new RuntimeException("Unsupported job type - " + 
spec.getJobType());
-      }
+  public static void runIngestionJob(SegmentGenerationJobSpec spec)
+      throws Exception {
+    StringWriter sw = new StringWriter();
+    new Yaml().dump(spec, sw);
+    LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString());
+    ExecutionFrameworkSpec executionFramework = 
spec.getExecutionFrameworkSpec();
+    PinotIngestionJobType jobType = 
PinotIngestionJobType.valueOf(spec.getJobType());
+    switch (jobType) {
+      case SegmentCreation:
+        kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
+        break;
+      case SegmentTarPush:
+        kickoffIngestionJob(spec, 
executionFramework.getSegmentTarPushJobRunnerClassName());
+        break;
+      case SegmentUriPush:
+        kickoffIngestionJob(spec, 
executionFramework.getSegmentUriPushJobRunnerClassName());
+        break;
+      case SegmentCreationAndTarPush:
+        kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
+        kickoffIngestionJob(spec, 
executionFramework.getSegmentTarPushJobRunnerClassName());
+        break;
+      case SegmentCreationAndUriPush:
+        kickoffIngestionJob(spec, 
executionFramework.getSegmentGenerationJobRunnerClassName());
+        kickoffIngestionJob(spec, 
executionFramework.getSegmentUriPushJobRunnerClassName());
+        break;
+      default:
+        LOGGER.error("Unsupported job type - {}. Support job types: {}", 
spec.getJobType(),
+            Arrays.toString(PinotIngestionJobType.values()));
+        throw new RuntimeException("Unsupported job type - " + 
spec.getJobType());
     }
   }
 
   private static void kickoffIngestionJob(SegmentGenerationJobSpec spec, 
String ingestionJobRunnerClassName)
       throws Exception {
-    IngestionJobRunner ingestionJobRunner =
-        PluginManager.get().createInstance(ingestionJobRunnerClassName);
+    IngestionJobRunner ingestionJobRunner = 
PluginManager.get().createInstance(ingestionJobRunnerClassName);
     ingestionJobRunner.init(spec);
     ingestionJobRunner.run();
   }
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index ea336ad..275e1b2 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -71,6 +71,12 @@
       <scope>runtime</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-batch-ingestion-standalone</artifactId>
+      <version>${project.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
index 4930a14..bdbbaa6 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java
@@ -19,17 +19,23 @@
 package org.apache.pinot.tools.admin.command;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
+import java.io.Reader;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
+import org.apache.pinot.common.utils.TenantRole;
 import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.common.utils.TenantRole;
 import org.apache.pinot.tools.QuickstartTableRequest;
+import org.yaml.snakeyaml.Yaml;
 
 
 public class QuickstartRunner {
@@ -167,7 +173,15 @@ public class QuickstartRunner {
       throws Exception {
     for (QuickstartTableRequest request : _tableRequests) {
       if (request.getTableType() == TableType.OFFLINE) {
-        IngestionJobLauncher.main(new 
String[]{request.getIngestionJobFile().getAbsolutePath()});
+        try (Reader reader = new BufferedReader(new 
FileReader(request.getIngestionJobFile().getAbsolutePath()))) {
+          SegmentGenerationJobSpec spec = new Yaml().loadAs(reader, 
SegmentGenerationJobSpec.class);
+          String inputDirURI = spec.getInputDirURI();
+          if (!new File(inputDirURI).exists()) {
+            URL resolvedInputDirURI = 
QuickstartRunner.class.getClassLoader().getResource(inputDirURI);
+            spec.setInputDirURI(resolvedInputDirURI.toURI().toString());
+          }
+          IngestionJobLauncher.runIngestionJob(spec);
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to