This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch standalone-pbnj
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/standalone-pbnj by this push:
new 1aa6c94 Make whole pipeline works
1aa6c94 is described below
commit 1aa6c94c25c93ce68309f5df312995007c73262c
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Dec 2 00:05:50 2019 -0800
Make whole pipeline works
---
pinot-ingestion-jobs/pinot-standalone/pom.xml | 85 +++++++++++++++++++++-
.../common/SegmentGenerationTaskRunner.java | 11 ++-
.../common/SegmentGenerationTaskSpec.java | 3 -
.../ingestion/common/SegmentNameGeneratorSpec.java | 5 +-
.../standalone/SegmentGenerationJobRunner.java | 80 +++++++++++++++++---
.../standalone/StandaloneIngestionJobLauncher.java | 34 +++++++--
.../src/main/resources/jobSpec.yaml | 37 ++++++++++
7 files changed, 228 insertions(+), 27 deletions(-)
diff --git a/pinot-ingestion-jobs/pinot-standalone/pom.xml
b/pinot-ingestion-jobs/pinot-standalone/pom.xml
index cd32f53..97a9109 100644
--- a/pinot-ingestion-jobs/pinot-standalone/pom.xml
+++ b/pinot-ingestion-jobs/pinot-standalone/pom.xml
@@ -35,12 +35,87 @@
<properties>
<pinot.root>${basedir}/../..</pinot.root>
</properties>
+ <profiles>
+ <profile>
+ <id>build-shaded-jar</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <!--
+ Usually in hadoop environment, there are multiple jars with
different versions.
+ Most of the NoSuchMethodExceptions are caused by class
loading conflicts.
+ Class relocation ensures the reference of certain
packages/classes in Pinot code to
+ shaded libs, e.g. jackson or guava.
+ Ref:
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html
+ -->
+ <relocations>
+ <relocation>
+ <pattern>com.google.common.base</pattern>
+
<shadedPattern>shaded.com.google.common.base</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+
<shadedPattern>shaded.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.pinot.spark.PinotSparkJobLauncher</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
<dependencies>
<dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-ingestion-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-parquet</artifactId>
+ <version>${project.version}</version>
+ <scope>shaded</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
@@ -57,6 +132,10 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
@@ -73,6 +152,10 @@
<artifactId>commons-math3</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
<!-- test -->
<dependency>
<groupId>org.testng</groupId>
diff --git
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
index d0a7bcc..619b247 100644
---
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
+++
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.File;
-import java.net.URLClassLoader;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
@@ -72,7 +71,7 @@ public class SegmentGenerationTaskRunner {
if (readerConfigClassName != null) {
Map<String, String> configs =
_taskSpec.getRecordReaderSpec().getConfigs();
- if(configs == null) {
+ if (configs == null) {
configs = new HashMap<>();
}
JsonNode jsonNode = new ObjectMapper().valueToTree(configs);
@@ -86,11 +85,15 @@ public class SegmentGenerationTaskRunner {
recordReader.init(new File(_taskSpec.getInputFilePath()), schema,
recordReaderConfig);
//init segmentName Generator
- String segmentNameGeneratorType =
_taskSpec.getSegmentNameGeneratorSpec().getType();
+ SegmentNameGeneratorSpec segmentNameGeneratorSpec =
_taskSpec.getSegmentNameGeneratorSpec();
+ if (segmentNameGeneratorSpec == null) {
+ segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
+ }
+ String segmentNameGeneratorType = segmentNameGeneratorSpec.getType();
if (segmentNameGeneratorType == null) {
segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR;
}
- Map<String, String> segmentNameGeneratorConfigs =
_taskSpec.getSegmentNameGeneratorSpec().getConfigs();
+ Map<String, String> segmentNameGeneratorConfigs =
segmentNameGeneratorSpec.getConfigs();
SegmentNameGenerator segmentNameGenerator;
switch (segmentNameGeneratorType) {
case SIMPLE_SEGMENT_NAME_GENERATOR:
diff --git
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
index 04fdbf3..1b20740 100644
---
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
+++
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
@@ -18,11 +18,8 @@
*/
package org.apache.pinot.ingestion.common;
-import java.net.URI;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.core.segment.name.SegmentNameGenerator;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.RecordReader;
public class SegmentGenerationTaskSpec {
diff --git
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
index 0b1c215..565028a 100644
---
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
+++
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
@@ -18,14 +18,15 @@
*/
package org.apache.pinot.ingestion.common;
+import java.util.HashMap;
import java.util.Map;
public class SegmentNameGeneratorSpec {
- String _type;
+ String _type = null;
- Map<String, String> _configs;
+ Map<String, String> _configs = new HashMap<>();
public String getType() {
return _type;
diff --git
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
index d6127bc..da04de3 100644
---
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
+++
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
@@ -18,27 +18,38 @@
*/
package org.apache.pinot.ingestion.standalone;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
-import java.io.FilenameFilter;
import java.net.URI;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.MapConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.utils.DataSize;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.filesystem.PinotFSFactory;
+import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.common.PinotFSSpec;
import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
import org.apache.pinot.ingestion.common.SegmentGenerationTaskRunner;
import org.apache.pinot.ingestion.common.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.PinotFS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SegmentGenerationJobRunner {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentGenerationJobRunner.class);
+
private SegmentGenerationJobSpec _spec;
public SegmentGenerationJobRunner(SegmentGenerationJobSpec spec) {
@@ -62,14 +73,40 @@ public class SegmentGenerationJobRunner {
//Get outputFS for writing output pinot segments
URI outputDirURI = new URI(_spec.getOutputDirURI());
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+ outputDirFS.mkdir(outputDirURI);
//Get list of files to process
String[] files = inputDirFS.listFiles(inputDirURI, true);
+
//TODO: sort input files based on creation time
- //TODO: handle input file name filters
+ List<String> filteredFiles = new ArrayList<>();
+ PathMatcher includeFilePathMatcher = null;
+ if (_spec.getIncludeFileNamePattern() != null) {
+ includeFilePathMatcher =
FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
+ }
+ PathMatcher excludeFilePathMatcher = null;
+ if (_spec.getExcludeFileNamePattern() != null) {
+ excludeFilePathMatcher =
FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
+ }
+
+ for (String file : files) {
+ if (includeFilePathMatcher != null) {
+ if (!includeFilePathMatcher.matches(Paths.get(file))) {
+ continue;
+ }
+ }
+ if (excludeFilePathMatcher != null) {
+ if (excludeFilePathMatcher.matches(Paths.get(file))) {
+ continue;
+ }
+ }
+ if (!inputDirFS.isDirectory(new URI(file))) {
+ filteredFiles.add(file);
+ }
+ }
//create tempDirectory for input and output
- File tempDirectory = FileUtils.getTempDirectory();
+ File tempDirectory = new File(FileUtils.getTempDirectory(), "pinot-" +
System.currentTimeMillis());
File localInputDir = new File(tempDirectory, "input");
FileUtils.forceMkdir(localInputDir);
File localOutputTempDirectory = new File(tempDirectory, "output");
@@ -79,12 +116,13 @@ public class SegmentGenerationJobRunner {
String schemaJson = IOUtils.toString(new
URI(_spec.getTableSpec().getSchemaURI()), "UTF-8");
Schema schema = Schema.fromString(schemaJson);
String tableConfigJson = IOUtils.toString(new
URI(_spec.getTableSpec().getTableConfigURI()), "UTF-8");
- TableConfig tableConfig = TableConfig.fromJsonString(tableConfigJson);
+ JsonNode offlineTableJsonNode = new
ObjectMapper().readTree(tableConfigJson).get("OFFLINE");
+ TableConfig tableConfig = TableConfig.fromJsonConfig(offlineTableJsonNode);
//iterate on the file list, for each
- for (int i = 0; i < files.length; i++) {
+ for (int i = 0; i < filteredFiles.size(); i++) {
//copy input path to local
- File inputDataFile = new File(localInputDir, new
File(files[i]).getName());
- inputDirFS.copyToLocalFile(new URI(files[i]), inputDataFile);
+ File inputDataFile = new File(localInputDir, new
File(filteredFiles.get(i)).getName());
+ inputDirFS.copyToLocalFile(new URI(filteredFiles.get(i)), inputDataFile);
//create taskspec
SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
@@ -100,12 +138,30 @@ public class SegmentGenerationJobRunner {
SegmentGenerationTaskRunner taskRunner = new
SegmentGenerationTaskRunner(taskSpec);
String segmentName = taskRunner.run();
- //move segment to output PinotFS
+ // Tar segment directory to compress file
- File outputSegmentFile = new File(localOutputTempDirectory, segmentName
+ ".tar.gz");
- outputDirFS.copyFromLocalFile(outputSegmentFile, outputDirURI);
+ File localSegmentDir = new File(localOutputTempDirectory, segmentName);
+ String segmentTarFileName = segmentName +
JobConfigConstants.TAR_GZ_FILE_EXT;
+ File localSegmentTarFile = new File(localOutputTempDirectory,
segmentTarFileName);
+ LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir,
localSegmentTarFile);
+ TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(),
localSegmentTarFile.getPath());
- FileUtils.deleteQuietly(outputSegmentFile);
+ long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+ long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+ LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}",
segmentName,
+ DataSize.fromBytes(uncompressedSegmentSize),
DataSize.fromBytes(compressedSegmentSize));
+
+ //move segment to output PinotFS
+ String outputSegmentTarName = outputDirURI.toString() + "/" +
segmentTarFileName;
+ URI outputSegmentTarURI = new URI(outputSegmentTarName);
+ if (!_spec.isOverwriteOutput() &&
outputDirFS.exists(outputSegmentTarURI)) {
+ LOGGER.warn("Not overwrite existing output segment tar file: {}",
outputDirFS.exists(outputSegmentTarURI));
+ } else {
+ outputDirFS.copyFromLocalFile(localSegmentTarFile,
outputSegmentTarURI);
+ }
+
+ FileUtils.deleteQuietly(localSegmentDir);
+ FileUtils.deleteQuietly(localSegmentTarFile);
FileUtils.deleteQuietly(inputDataFile);
}
//clean up
diff --git
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
index b1d3741..04790a9 100644
---
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
+++
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
@@ -22,24 +22,48 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.io.Reader;
import java.io.StringWriter;
+import java.util.Arrays;
+import org.apache.pinot.ingestion.common.PinotIngestionJobType;
import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
public class StandaloneIngestionJobLauncher {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(StandaloneIngestionJobLauncher.class);
- public static void main(String[] args) throws Exception{
- String jobSpecFilePath = args[0];
+ private static final String USAGE = "usage: [jobSpec.yaml]";
+
+ private static void usage() {
+ System.err.println(USAGE);
+ }
- try(Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
+ public static void main(String[] args)
+ throws Exception {
+ if (args.length != 1) {
+ usage();
+ System.exit(1);
+ }
+ String jobSpecFilePath = args[0];
+ try (Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
Yaml yaml = new Yaml();
SegmentGenerationJobSpec spec = yaml.loadAs(reader,
SegmentGenerationJobSpec.class);
StringWriter sw = new StringWriter();
yaml.dump(spec, sw);
- System.out.println("dump = " + sw.toString());
-
+ LOGGER.info("SegmentGenerationJobSpec: \n{}", sw.toString());
+ PinotIngestionJobType jobType =
PinotIngestionJobType.valueOf(spec.getJobType());
+ switch (jobType) {
+ case SegmentCreation:
+ new SegmentGenerationJobRunner(spec).run();
+ break;
+ default:
+ LOGGER.error("Unsupported job type - {}. Support job types: {}",
spec.getJobType(),
+ Arrays.toString(PinotIngestionJobType.values()));
+ throw new RuntimeException("Unsupported job type - " +
spec.getJobType());
+ }
}
}
}
diff --git
a/pinot-ingestion-jobs/pinot-standalone/src/main/resources/jobSpec.yaml
b/pinot-ingestion-jobs/pinot-standalone/src/main/resources/jobSpec.yaml
new file mode 100644
index 0000000..d2bd586
--- /dev/null
+++ b/pinot-ingestion-jobs/pinot-standalone/src/main/resources/jobSpec.yaml
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+jobType: SegmentCreation
+inputDirURI: 'file:///Users/xiangfu/sample_data/input1'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///Users/xiangfu/sample_data/output2'
+overwriteOutput: true
+pinotFSSpecs:
+ - scheme: file
+ className: org.apache.pinot.filesystem.LocalPinotFS
+recordReaderSpec:
+ dataFormat: 'parquet'
+ className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+ tableName: 'ops_wb_table'
+ schemaURI: 'http://localhost:9000/tables/ops_wb_table/schema'
+ tableConfigURI: 'http://localhost:9000/tables/ops_wb_table?type=offline'
+pinotClusterSpecs:
+ - controllerURI: 'localhost:9000'
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]