This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch standalone-pbnj
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/standalone-pbnj by this push:
     new 1aa6c94  Make whole pipeline works
1aa6c94 is described below

commit 1aa6c94c25c93ce68309f5df312995007c73262c
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Dec 2 00:05:50 2019 -0800

    Make whole pipeline works
---
 pinot-ingestion-jobs/pinot-standalone/pom.xml      | 85 +++++++++++++++++++++-
 .../common/SegmentGenerationTaskRunner.java        | 11 ++-
 .../common/SegmentGenerationTaskSpec.java          |  3 -
 .../ingestion/common/SegmentNameGeneratorSpec.java |  5 +-
 .../standalone/SegmentGenerationJobRunner.java     | 80 +++++++++++++++++---
 .../standalone/StandaloneIngestionJobLauncher.java | 34 +++++++--
 .../src/main/resources/jobSpec.yaml                | 37 ++++++++++
 7 files changed, 228 insertions(+), 27 deletions(-)

diff --git a/pinot-ingestion-jobs/pinot-standalone/pom.xml 
b/pinot-ingestion-jobs/pinot-standalone/pom.xml
index cd32f53..97a9109 100644
--- a/pinot-ingestion-jobs/pinot-standalone/pom.xml
+++ b/pinot-ingestion-jobs/pinot-standalone/pom.xml
@@ -35,12 +35,87 @@
   <properties>
     <pinot.root>${basedir}/../..</pinot.root>
   </properties>
+  <profiles>
+    <profile>
+      <id>build-shaded-jar</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>3.2.1</version>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <goals>
+                  <goal>shade</goal>
+                </goals>
+                <configuration>
+                  <!--
+                  Usually in hadoop environment, there are multiple jars with 
different versions.
+                  Most of the NoSuchMethodExceptions are caused by class 
loading conflicts.
+                  Class relocation ensures the reference of certain 
packages/classes in Pinot code to
+                  shaded libs, e.g. jackson or guava.
+                  Ref: 
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
+                  -->
+                  <relocations>
+                    <relocation>
+                      <pattern>com.google.common.base</pattern>
+                      
<shadedPattern>shaded.com.google.common.base</shadedPattern>
+                    </relocation>
+                    <relocation>
+                      <pattern>com.fasterxml.jackson</pattern>
+                      
<shadedPattern>shaded.com.fasterxml.jackson</shadedPattern>
+                    </relocation>
+                  </relocations>
+                  <transformers>
+                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                      
<mainClass>org.apache.pinot.spark.PinotSparkJobLauncher</mainClass>
+                    </transformer>
+                  </transformers>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-ingestion-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-parquet</artifactId>
+      <version>${project.version}</version>
+      <scope>shaded</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.pinot</groupId>
+          <artifactId>pinot-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
       <exclusions>
         <exclusion>
           <groupId>commons-logging</groupId>
@@ -57,6 +132,10 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>commons-configuration</groupId>
+      <artifactId>commons-configuration</artifactId>
+    </dependency>
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
     </dependency>
@@ -73,6 +152,10 @@
       <artifactId>commons-math3</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
     <!-- test -->
     <dependency>
       <groupId>org.testng</groupId>
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
index d0a7bcc..619b247 100644
--- 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.io.File;
-import java.net.URLClassLoader;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
@@ -72,7 +71,7 @@ public class SegmentGenerationTaskRunner {
 
     if (readerConfigClassName != null) {
       Map<String, String> configs = 
_taskSpec.getRecordReaderSpec().getConfigs();
-      if(configs == null) {
+      if (configs == null) {
         configs = new HashMap<>();
       }
       JsonNode jsonNode = new ObjectMapper().valueToTree(configs);
@@ -86,11 +85,15 @@ public class SegmentGenerationTaskRunner {
     recordReader.init(new File(_taskSpec.getInputFilePath()), schema, 
recordReaderConfig);
 
     //init segmentName Generator
-    String segmentNameGeneratorType = 
_taskSpec.getSegmentNameGeneratorSpec().getType();
+    SegmentNameGeneratorSpec segmentNameGeneratorSpec = 
_taskSpec.getSegmentNameGeneratorSpec();
+    if (segmentNameGeneratorSpec == null) {
+      segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
+    }
+    String segmentNameGeneratorType = segmentNameGeneratorSpec.getType();
     if (segmentNameGeneratorType == null) {
       segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR;
     }
-    Map<String, String> segmentNameGeneratorConfigs = 
_taskSpec.getSegmentNameGeneratorSpec().getConfigs();
+    Map<String, String> segmentNameGeneratorConfigs = 
segmentNameGeneratorSpec.getConfigs();
     SegmentNameGenerator segmentNameGenerator;
     switch (segmentNameGeneratorType) {
       case SIMPLE_SEGMENT_NAME_GENERATOR:
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
index 04fdbf3..1b20740 100644
--- 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
@@ -18,11 +18,8 @@
  */
 package org.apache.pinot.ingestion.common;
 
-import java.net.URI;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.RecordReader;
 
 
 public class SegmentGenerationTaskSpec {
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
index 0b1c215..565028a 100644
--- 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
@@ -18,14 +18,15 @@
  */
 package org.apache.pinot.ingestion.common;
 
+import java.util.HashMap;
 import java.util.Map;
 
 
 public class SegmentNameGeneratorSpec {
 
-  String _type;
+  String _type = null;
 
-  Map<String, String> _configs;
+  Map<String, String> _configs = new HashMap<>();
 
   public String getType() {
     return _type;
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
index d6127bc..da04de3 100644
--- 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
@@ -18,27 +18,38 @@
  */
 package org.apache.pinot.ingestion.standalone;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
-import java.io.FilenameFilter;
 import java.net.URI;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.MapConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.filesystem.PinotFSFactory;
+import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.common.PinotFSSpec;
 import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
 import org.apache.pinot.ingestion.common.SegmentGenerationTaskRunner;
 import org.apache.pinot.ingestion.common.SegmentGenerationTaskSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.filesystem.PinotFS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class SegmentGenerationJobRunner {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
+
   private SegmentGenerationJobSpec _spec;
 
   public SegmentGenerationJobRunner(SegmentGenerationJobSpec spec) {
@@ -62,14 +73,40 @@ public class SegmentGenerationJobRunner {
     //Get outputFS for writing output pinot segments
     URI outputDirURI = new URI(_spec.getOutputDirURI());
     PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+    outputDirFS.mkdir(outputDirURI);
 
     //Get list of files to process
     String[] files = inputDirFS.listFiles(inputDirURI, true);
+
     //TODO: sort input files based on creation time
-    //TODO: handle input file name filters
+    List<String> filteredFiles = new ArrayList<>();
+    PathMatcher includeFilePathMatcher = null;
+    if (_spec.getIncludeFileNamePattern() != null) {
+      includeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
+    }
+    PathMatcher excludeFilePathMatcher = null;
+    if (_spec.getExcludeFileNamePattern() != null) {
+      excludeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
+    }
+
+    for (String file : files) {
+      if (includeFilePathMatcher != null) {
+        if (!includeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (excludeFilePathMatcher != null) {
+        if (excludeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (!inputDirFS.isDirectory(new URI(file))) {
+        filteredFiles.add(file);
+      }
+    }
 
     //create tempDirectory for input and output
-    File tempDirectory = FileUtils.getTempDirectory();
+    File tempDirectory = new File(FileUtils.getTempDirectory(), "pinot-" + 
System.currentTimeMillis());
     File localInputDir = new File(tempDirectory, "input");
     FileUtils.forceMkdir(localInputDir);
     File localOutputTempDirectory = new File(tempDirectory, "output");
@@ -79,12 +116,13 @@ public class SegmentGenerationJobRunner {
     String schemaJson = IOUtils.toString(new 
URI(_spec.getTableSpec().getSchemaURI()), "UTF-8");
     Schema schema = Schema.fromString(schemaJson);
     String tableConfigJson = IOUtils.toString(new 
URI(_spec.getTableSpec().getTableConfigURI()), "UTF-8");
-    TableConfig tableConfig = TableConfig.fromJsonString(tableConfigJson);
+    JsonNode offlineTableJsonNode = new 
ObjectMapper().readTree(tableConfigJson).get("OFFLINE");
+    TableConfig tableConfig = TableConfig.fromJsonConfig(offlineTableJsonNode);
     //iterate on the file list, for each
-    for (int i = 0; i < files.length; i++) {
+    for (int i = 0; i < filteredFiles.size(); i++) {
       //copy input path to local
-      File inputDataFile = new File(localInputDir, new 
File(files[i]).getName());
-      inputDirFS.copyToLocalFile(new URI(files[i]), inputDataFile);
+      File inputDataFile = new File(localInputDir, new 
File(filteredFiles.get(i)).getName());
+      inputDirFS.copyToLocalFile(new URI(filteredFiles.get(i)), inputDataFile);
 
       //create taskspec
       SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
@@ -100,12 +138,30 @@ public class SegmentGenerationJobRunner {
       SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
       String segmentName = taskRunner.run();
 
-      //move segment to output PinotFS
+      // Tar segment directory to compress file
 
-      File outputSegmentFile = new File(localOutputTempDirectory, segmentName 
+ ".tar.gz");
-      outputDirFS.copyFromLocalFile(outputSegmentFile, outputDirURI);
+      File localSegmentDir = new File(localOutputTempDirectory, segmentName);
+      String segmentTarFileName = segmentName + 
JobConfigConstants.TAR_GZ_FILE_EXT;
+      File localSegmentTarFile = new File(localOutputTempDirectory, 
segmentTarFileName);
+      LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, 
localSegmentTarFile);
+      TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), 
localSegmentTarFile.getPath());
 
-      FileUtils.deleteQuietly(outputSegmentFile);
+      long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+      long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+      LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", 
segmentName,
+          DataSize.fromBytes(uncompressedSegmentSize), 
DataSize.fromBytes(compressedSegmentSize));
+
+      //move segment to output PinotFS
+      String outputSegmentTarName = outputDirURI.toString() + "/" + 
segmentTarFileName;
+      URI outputSegmentTarURI = new URI(outputSegmentTarName);
+      if (!_spec.isOverwriteOutput() && 
outputDirFS.exists(outputSegmentTarURI)) {
+        LOGGER.warn("Not overwrite existing output segment tar file: {}", 
outputDirFS.exists(outputSegmentTarURI));
+      } else {
+        outputDirFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
+      }
+
+      FileUtils.deleteQuietly(localSegmentDir);
+      FileUtils.deleteQuietly(localSegmentTarFile);
       FileUtils.deleteQuietly(inputDataFile);
     }
     //clean up
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
index b1d3741..04790a9 100644
--- 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
@@ -22,24 +22,48 @@ import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.Reader;
 import java.io.StringWriter;
+import java.util.Arrays;
+import org.apache.pinot.ingestion.common.PinotIngestionJobType;
 import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
 
 
 public class StandaloneIngestionJobLauncher {
 
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(StandaloneIngestionJobLauncher.class);
 
-  public static void main(String[] args) throws Exception{
-    String jobSpecFilePath = args[0];
+  private static final String USAGE = "usage: [jobSpec.yaml]";
+
+  private static void usage() {
+    System.err.println(USAGE);
+  }
 
-    try(Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
+  public static void main(String[] args)
+      throws Exception {
+    if (args.length != 1) {
+      usage();
+      System.exit(1);
+    }
+    String jobSpecFilePath = args[0];
 
+    try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
       Yaml yaml = new Yaml();
       SegmentGenerationJobSpec spec = yaml.loadAs(reader, 
SegmentGenerationJobSpec.class);
       StringWriter sw = new StringWriter();
       yaml.dump(spec, sw);
-      System.out.println("dump = " + sw.toString());
-
+      LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString());
+      PinotIngestionJobType jobType = 
PinotIngestionJobType.valueOf(spec.getJobType());
+      switch (jobType) {
+        case SegmentCreation:
+          new SegmentGenerationJobRunner(spec).run();
+          break;
+        default:
+          LOGGER.error("Unsupported job type - {}. Support job types: {}", 
spec.getJobType(),
+              Arrays.toString(PinotIngestionJobType.values()));
+          throw new RuntimeException("Unsupported job type - " + 
spec.getJobType());
+      }
     }
   }
 }
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/resources/jobSpec.yaml 
b/pinot-ingestion-jobs/pinot-standalone/src/main/resources/jobSpec.yaml
new file mode 100644
index 0000000..d2bd586
--- /dev/null
+++ b/pinot-ingestion-jobs/pinot-standalone/src/main/resources/jobSpec.yaml
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+jobType: SegmentCreation
+inputDirURI: 'file:///Users/xiangfu/sample_data/input1'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///Users/xiangfu/sample_data/output2'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'ops_wb_table'
+  schemaURI: 'http://localhost:9000/tables/ops_wb_table/schema'
+  tableConfigURI: 'http://localhost:9000/tables/ops_wb_table?type=offline'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to