This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch standalone-pbnj
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f43624559a06fbb76b3f4bac1cdcec44712a8099
Author: kishoreg <[email protected]>
AuthorDate: Sun Dec 1 18:55:25 2019 -0800

    Adding pinot-standalone module to generate segments
---
 .../apache/pinot/filesystem/PinotFSFactory.java    |  24 +--
 .../{ => pinot-standalone}/pom.xml                 |  77 +++++++---
 .../pinot/ingestion/common/PinotClusterSpec.java   |  32 ++++
 .../apache/pinot/ingestion/common/PinotFSSpec.java |  55 +++++++
 .../pinot/ingestion/common/RecordReaderSpec.java   |  65 +++++++++
 .../ingestion/common/SegmentGenerationJobSpec.java | 162 +++++++++++++++++++++
 .../common/SegmentGenerationTaskRunner.java        | 132 +++++++++++++++++
 .../common/SegmentGenerationTaskSpec.java          |  99 +++++++++++++
 .../ingestion/common/SegmentNameGeneratorSpec.java |  45 ++++++
 .../apache/pinot/ingestion/common/TableSpec.java   |  52 +++++++
 .../standalone/SegmentGenerationJobRunner.java     | 114 +++++++++++++++
 .../standalone/StandaloneIngestionJobLauncher.java |  45 ++++++
 pinot-ingestion-jobs/pom.xml                       |   1 +
 13 files changed, 868 insertions(+), 35 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java 
b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java
index 9f0c98c..4e41dab 100644
--- a/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/filesystem/PinotFSFactory.java
@@ -43,6 +43,18 @@ public class PinotFSFactory {
 
   }
 
+  public static void register(String scheme, String fsClassName, Configuration 
configuration) {
+    try {
+      LOGGER.info("Initializing PinotFS for scheme {}, classname {}", scheme, 
fsClassName);
+      PinotFS pinotFS = (PinotFS) Class.forName(fsClassName).newInstance();
+      pinotFS.init(configuration);
+      _fileSystemMap.put(scheme, pinotFS);
+    } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
+      LOGGER.error("Could not instantiate file system for class {}", 
fsClassName, e);
+      throw new RuntimeException(e);
+    }
+  }
+
   public static void init(Configuration fsConfig) {
     // Get schemes and their respective classes
     Iterator<String> keys = fsConfig.subset(CLASS).getKeys();
@@ -53,17 +65,7 @@ public class PinotFSFactory {
       String key = keys.next();
       String fsClassName = (String) fsConfig.getProperty(CLASS + "." + key);
       LOGGER.info("Got scheme {}, classname {}, starting to initialize", key, 
fsClassName);
-
-      try {
-        PinotFS pinotFS = (PinotFS) Class.forName(fsClassName).newInstance();
-        pinotFS.init(fsConfig.subset(key));
-
-        LOGGER.info("Initializing PinotFS for scheme {}, classname {}", key, 
fsClassName);
-        _fileSystemMap.put(key, pinotFS);
-      } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
-        LOGGER.error("Could not instantiate file system for class {}", 
fsClassName, e);
-        throw new RuntimeException(e);
-      }
+      register(key, fsClassName, fsConfig.subset(key));
     }
 
     if (!_fileSystemMap.containsKey(DEFAULT_FS_SCHEME)) {
diff --git a/pinot-ingestion-jobs/pom.xml 
b/pinot-ingestion-jobs/pinot-standalone/pom.xml
similarity index 51%
copy from pinot-ingestion-jobs/pom.xml
copy to pinot-ingestion-jobs/pinot-standalone/pom.xml
index b89a8f6..cd32f53 100644
--- a/pinot-ingestion-jobs/pom.xml
+++ b/pinot-ingestion-jobs/pinot-standalone/pom.xml
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
 
     Licensed to the Apache Software Foundation (ASF) under one
@@ -22,45 +22,57 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>pinot</artifactId>
+    <artifactId>pinot-ingestion-jobs</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>0.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
   </parent>
-
-  <artifactId>pinot-ingestion-jobs</artifactId>
-  <packaging>pom</packaging>
-  <name>Pinot Ingestion Jobs</name>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>pinot-standalone</artifactId>
+  <name>Pinot Ingestion Standalone</name>
   <url>https://pinot.apache.org/</url>
   <properties>
-    <pinot.root>${basedir}/..</pinot.root>
+    <pinot.root>${basedir}/../..</pinot.root>
   </properties>
 
-  <modules>
-    <module>pinot-ingestion-common</module>
-    <module>pinot-hadoop</module>
-    <module>pinot-spark</module>
-  </modules>
-
   <dependencies>
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-core</artifactId>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
       <exclusions>
         <exclusion>
-          <groupId>org.scala-lang</groupId>
-          <artifactId>scala-library</artifactId>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-lang</groupId>
+          <artifactId>commons-lang</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-math3</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
-
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.yaml</groupId>
+      <artifactId>snakeyaml</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math3</artifactId>
+      <scope>provided</scope>
+    </dependency>
     <!-- test -->
     <dependency>
       <groupId>org.testng</groupId>
@@ -69,4 +81,21 @@
     </dependency>
   </dependencies>
 
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.5</version>
+        <configuration>
+          <forceCreation>true</forceCreation>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>
+
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java
new file mode 100644
index 0000000..2c8195d
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotClusterSpec.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+public class PinotClusterSpec {
+
+  String _controllerURI;
+
+  public String getControllerURI() {
+    return _controllerURI;
+  }
+
+  public void setControllerURI(String controllerURI) {
+    _controllerURI = controllerURI;
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java
new file mode 100644
index 0000000..078c8e5
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/PinotFSSpec.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+import java.util.Map;
+
+
+public class PinotFSSpec {
+
+  String _scheme;
+
+  String _className;
+
+  Map<String, String> _configs;
+
+  public String getScheme() {
+    return _scheme;
+  }
+
+  public void setScheme(String scheme) {
+    _scheme = scheme;
+  }
+
+  public String getClassName() {
+    return _className;
+  }
+
+  public void setClassName(String className) {
+    _className = className;
+  }
+
+  public Map<String, String> getConfigs() {
+    return _configs;
+  }
+
+  public void setConfigs(Map<String, String> configs) {
+    _configs = configs;
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java
new file mode 100644
index 0000000..f374a66
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/RecordReaderSpec.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+import java.util.Map;
+
+
+public class RecordReaderSpec {
+
+  String _dataFormat;
+
+  String _className;
+
+  String _configClassName;
+
+  Map<String, String> _configs;
+
+  public String getDataFormat() {
+    return _dataFormat;
+  }
+
+  public void setDataFormat(String dataFormat) {
+    _dataFormat = dataFormat;
+  }
+
+  public String getClassName() {
+    return _className;
+  }
+
+  public void setClassName(String className) {
+    _className = className;
+  }
+
+  public Map<String, String> getConfigs() {
+    return _configs;
+  }
+
+  public void setConfigs(Map<String, String> configs) {
+    _configs = configs;
+  }
+
+  public String getConfigClassName() {
+    return _configClassName;
+  }
+
+  public void setConfigClassName(String configClassName) {
+    _configClassName = configClassName;
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java
new file mode 100644
index 0000000..1b7b7c9
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationJobSpec.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.util.List;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class SegmentGenerationJobSpec {
+
+  String jobType;
+
+  String _inputDirURI;
+
+  String _includeFileNamePattern;
+
+  String _excludeFileNamePattern;
+
+  String _outputDirURI;
+
+  boolean _overwriteOutput;
+
+  List<PinotFSSpec> _pinotFSSpecs;
+
+  TableSpec _tableSpec;
+
+  RecordReaderSpec _recordReaderSpec;
+
+  SegmentNameGeneratorSpec _segmentNameGeneratorSpec;
+
+  PinotClusterSpec[] _pinotClusterSpecs;
+
+  public String getJobType() {
+    return jobType;
+  }
+
+  public void setJobType(String jobType) {
+    this.jobType = jobType;
+  }
+
+  public String getInputDirURI() {
+    return _inputDirURI;
+  }
+
+  public void setInputDirURI(String inputDirURI) {
+    _inputDirURI = inputDirURI;
+  }
+
+  public String getIncludeFileNamePattern() {
+    return _includeFileNamePattern;
+  }
+
+  public void setIncludeFileNamePattern(String includeFileNamePattern) {
+    _includeFileNamePattern = includeFileNamePattern;
+  }
+
+  public String getExcludeFileNamePattern() {
+    return _excludeFileNamePattern;
+  }
+
+  public void setExcludeFileNamePattern(String excludeFileNamePattern) {
+    _excludeFileNamePattern = excludeFileNamePattern;
+  }
+
+  public String getOutputDirURI() {
+    return _outputDirURI;
+  }
+
+  public void setOutputDirURI(String outputDirURI) {
+    _outputDirURI = outputDirURI;
+  }
+
+  public boolean isOverwriteOutput() {
+    return _overwriteOutput;
+  }
+
+  public void setOverwriteOutput(boolean overwriteOutput) {
+    _overwriteOutput = overwriteOutput;
+  }
+
+  public List<PinotFSSpec> getPinotFSSpecs() {
+    return _pinotFSSpecs;
+  }
+
+  public void setPinotFSSpecs(List<PinotFSSpec> pinotFSSpecs) {
+    _pinotFSSpecs = pinotFSSpecs;
+  }
+
+  public TableSpec getTableSpec() {
+    return _tableSpec;
+  }
+
+  public void setTableSpec(TableSpec tableSpec) {
+    _tableSpec = tableSpec;
+  }
+
+  public RecordReaderSpec getRecordReaderSpec() {
+    return _recordReaderSpec;
+  }
+
+  public void setRecordReaderSpec(RecordReaderSpec recordReaderSpec) {
+    _recordReaderSpec = recordReaderSpec;
+  }
+
+  public PinotClusterSpec[] getPinotClusterSpecs() {
+    return _pinotClusterSpecs;
+  }
+
+  public void setPinotClusterSpecs(PinotClusterSpec[] pinotClusterSpecs) {
+    _pinotClusterSpecs = pinotClusterSpecs;
+  }
+
+  public SegmentNameGeneratorSpec getSegmentNameGeneratorSpec() {
+    return _segmentNameGeneratorSpec;
+  }
+
+  public void setSegmentNameGeneratorSpec(SegmentNameGeneratorSpec 
segmentNameGeneratorSpec) {
+    _segmentNameGeneratorSpec = segmentNameGeneratorSpec;
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    DumperOptions options = new DumperOptions();
+    options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK);
+    options.setPrettyFlow(true);
+    Yaml yaml = new Yaml(options);
+
+    Reader reader;
+    reader = new BufferedReader(new 
FileReader("/Users/kishoreg/Documents/testJobSpec.yaml"));
+    SegmentGenerationJobSpec loadedSpec = yaml.loadAs(reader, 
SegmentGenerationJobSpec.class);
+    System.out.println("loadedSpec = " + loadedSpec);
+
+    SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+
+    StringWriter sw = new StringWriter();
+    yaml.dump(loadedSpec, sw);
+    System.out.println("dump = " + sw.toString());
+  }
+}
+
+
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
new file mode 100644
index 0000000..d0a7bcc
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskRunner.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.net.URLClassLoader;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeFieldSpec;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public class SegmentGenerationTaskRunner {
+
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = 
"segment.name.generator.type";
+  public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+  public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = 
"normalizedDate";
+  public static final String DEFAULT_SEGMENT_NAME_GENERATOR = 
SIMPLE_SEGMENT_NAME_GENERATOR;
+
+  // For SimpleSegmentNameGenerator
+  public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
+
+  // For NormalizedDateSegmentNameGenerator
+  public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
+  public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
+
+  private SegmentGenerationTaskSpec _taskSpec;
+
+  public SegmentGenerationTaskRunner(SegmentGenerationTaskSpec taskSpec) {
+    _taskSpec = taskSpec;
+  }
+
+  public String run()
+      throws Exception {
+
+    String tableName = _taskSpec.getTableConfig().getTableName();
+    TableConfig tableConfig = _taskSpec._tableConfig;
+    Schema schema = _taskSpec.getSchema();
+
+    //init record reader config
+    String readerConfigClassName = 
_taskSpec.getRecordReaderSpec().getConfigClassName();
+    RecordReaderConfig recordReaderConfig = null;
+
+    if (readerConfigClassName != null) {
+      Map<String, String> configs = 
_taskSpec.getRecordReaderSpec().getConfigs();
+      if(configs == null) {
+        configs = new HashMap<>();
+      }
+      JsonNode jsonNode = new ObjectMapper().valueToTree(configs);
+      JsonUtils.jsonNodeToObject(jsonNode, 
Class.forName(readerConfigClassName));
+      recordReaderConfig = (RecordReaderConfig) 
Class.forName(readerConfigClassName).newInstance();
+    }
+
+    //init record reader
+    String readerClassName = _taskSpec.getRecordReaderSpec().getClassName();
+    RecordReader recordReader = (RecordReader) 
Class.forName(readerClassName).newInstance();
+    recordReader.init(new File(_taskSpec.getInputFilePath()), schema, 
recordReaderConfig);
+
+    //init segmentName Generator
+    String segmentNameGeneratorType = 
_taskSpec.getSegmentNameGeneratorSpec().getType();
+    if (segmentNameGeneratorType == null) {
+      segmentNameGeneratorType = SIMPLE_SEGMENT_NAME_GENERATOR;
+    }
+    Map<String, String> segmentNameGeneratorConfigs = 
_taskSpec.getSegmentNameGeneratorSpec().getConfigs();
+    SegmentNameGenerator segmentNameGenerator;
+    switch (segmentNameGeneratorType) {
+      case SIMPLE_SEGMENT_NAME_GENERATOR:
+        segmentNameGenerator =
+            new SimpleSegmentNameGenerator(tableName, 
segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX));
+        break;
+      case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        Preconditions.checkState(tableConfig != null,
+            "In order to use NormalizedDateSegmentNameGenerator, table config 
must be provided");
+        SegmentsValidationAndRetentionConfig validationConfig = 
tableConfig.getValidationConfig();
+        String timeFormat = null;
+        TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec();
+        if (timeFieldSpec != null) {
+          timeFormat = 
timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+        }
+        segmentNameGenerator =
+            new NormalizedDateSegmentNameGenerator(tableName, 
segmentNameGeneratorConfigs.get(SEGMENT_NAME_PREFIX),
+                
Boolean.valueOf(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)),
+                validationConfig.getSegmentPushType(), 
validationConfig.getSegmentPushFrequency(),
+                validationConfig.getTimeType(), timeFormat);
+        break;
+      default:
+        throw new UnsupportedOperationException("Unsupported segment name 
generator type: " + segmentNameGeneratorType);
+    }
+
+    //init segment generation config
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setTableName(tableName);
+    segmentGeneratorConfig.setOutDir(_taskSpec.getOutputDirectoryPath());
+    segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+    segmentGeneratorConfig.setSequenceId(_taskSpec.getSequenceId());
+
+    //build segment
+    SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new 
SegmentIndexCreationDriverImpl();
+    segmentIndexCreationDriver.init(segmentGeneratorConfig, recordReader);
+    segmentIndexCreationDriver.build();
+    return segmentIndexCreationDriver.getSegmentName();
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
new file mode 100644
index 0000000..04fdbf3
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentGenerationTaskSpec.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+import java.net.URI;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
+
+
+public class SegmentGenerationTaskSpec {
+
+  TableConfig _tableConfig;
+
+  Schema _schema;
+
+  RecordReaderSpec _recordReaderSpec;
+
+  SegmentNameGeneratorSpec _segmentNameGeneratorSpec;
+
+  String _inputFilePath;
+
+  String _outputDirectoryPath;
+
+  int _sequenceId;
+
+  public TableConfig getTableConfig() {
+    return _tableConfig;
+  }
+
+  public void setTableConfig(TableConfig tableConfig) {
+    _tableConfig = tableConfig;
+  }
+
+  public Schema getSchema() {
+    return _schema;
+  }
+
+  public void setSchema(Schema schema) {
+    _schema = schema;
+  }
+
+  public RecordReaderSpec getRecordReaderSpec() {
+    return _recordReaderSpec;
+  }
+
+  public void setRecordReaderSpec(RecordReaderSpec recordReaderSpec) {
+    _recordReaderSpec = recordReaderSpec;
+  }
+
+  public SegmentNameGeneratorSpec getSegmentNameGeneratorSpec() {
+    return _segmentNameGeneratorSpec;
+  }
+
+  public void setSegmentNameGeneratorSpec(SegmentNameGeneratorSpec 
segmentNameGeneratorSpec) {
+    _segmentNameGeneratorSpec = segmentNameGeneratorSpec;
+  }
+
+  public String getInputFilePath() {
+    return _inputFilePath;
+  }
+
+  public void setInputFilePath(String inputFilePath) {
+    _inputFilePath = inputFilePath;
+  }
+
+  public String getOutputDirectoryPath() {
+    return _outputDirectoryPath;
+  }
+
+  public void setOutputDirectoryPath(String outputDirectoryPath) {
+    _outputDirectoryPath = outputDirectoryPath;
+  }
+
+  public int getSequenceId() {
+    return _sequenceId;
+  }
+
+  public void setSequenceId(int sequenceId) {
+    _sequenceId = sequenceId;
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
new file mode 100644
index 0000000..0b1c215
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/SegmentNameGeneratorSpec.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+import java.util.Map;
+
+
+public class SegmentNameGeneratorSpec {
+
+  String _type;
+
+  Map<String, String> _configs;
+
+  public String getType() {
+    return _type;
+  }
+
+  public void setType(String type) {
+    _type = type;
+  }
+
+  public Map<String, String> getConfigs() {
+    return _configs;
+  }
+
+  public void setConfigs(Map<String, String> configs) {
+    _configs = configs;
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java
new file mode 100644
index 0000000..ae2745e
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/common/TableSpec.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.common;
+
+public class TableSpec {
+
+  String _tableName;
+
+  String _schemaURI;
+
+  String _tableConfigURI;
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  public String getSchemaURI() {
+    return _schemaURI;
+  }
+
+  public void setSchemaURI(String schemaURI) {
+    _schemaURI = schemaURI;
+  }
+
+  public String getTableConfigURI() {
+    return _tableConfigURI;
+  }
+
+  public void setTableConfigURI(String tableConfigURI) {
+    _tableConfigURI = tableConfigURI;
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
new file mode 100644
index 0000000..d6127bc
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/SegmentGenerationJobRunner.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.standalone;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URI;
+import java.util.List;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.filesystem.PinotFSFactory;
+import org.apache.pinot.ingestion.common.PinotFSSpec;
+import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
+import org.apache.pinot.ingestion.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.ingestion.common.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+
+
+public class SegmentGenerationJobRunner {
+
+  private SegmentGenerationJobSpec _spec;
+
+  public SegmentGenerationJobRunner(SegmentGenerationJobSpec spec) {
+
+    _spec = spec;
+  }
+
+  public void run()
+      throws Exception {
+    //init all file systems
+    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+      Configuration config = new MapConfiguration(pinotFSSpec.getConfigs());
+      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), config);
+    }
+
+    //Get pinotFS for input
+    URI inputDirURI = new URI(_spec.getInputDirURI());
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
+
+    //Get outputFS for writing output pinot segments
+    URI outputDirURI = new URI(_spec.getOutputDirURI());
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+
+    //Get list of files to process
+    String[] files = inputDirFS.listFiles(inputDirURI, true);
+    //TODO: sort input files based on creation time
+    //TODO: handle input file name filters
+
+    //create tempDirectory for input and output
+    File tempDirectory = FileUtils.getTempDirectory();
+    File localInputDir = new File(tempDirectory, "input");
+    FileUtils.forceMkdir(localInputDir);
+    File localOutputTempDirectory = new File(tempDirectory, "output");
+    FileUtils.forceMkdir(localOutputTempDirectory);
+
+    //Read TableConfig, Schema
+    String schemaJson = IOUtils.toString(new 
URI(_spec.getTableSpec().getSchemaURI()), "UTF-8");
+    Schema schema = Schema.fromString(schemaJson);
+    String tableConfigJson = IOUtils.toString(new 
URI(_spec.getTableSpec().getTableConfigURI()), "UTF-8");
+    TableConfig tableConfig = TableConfig.fromJsonString(tableConfigJson);
+    //iterate on the file list, for each
+    for (int i = 0; i < files.length; i++) {
+      //copy input path to local
+      File inputDataFile = new File(localInputDir, new 
File(files[i]).getName());
+      inputDirFS.copyToLocalFile(new URI(files[i]), inputDataFile);
+
+      //create taskspec
+      SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+      taskSpec.setInputFilePath(inputDataFile.getAbsolutePath());
+      
taskSpec.setOutputDirectoryPath(localOutputTempDirectory.getAbsolutePath());
+      taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+      taskSpec.setSchema(schema);
+      taskSpec.setTableConfig(tableConfig);
+      taskSpec.setSequenceId(i);
+      
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+
+      //invoke segmentGenerationTask
+      SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
+      String segmentName = taskRunner.run();
+
+      //move segment to output PinotFS
+
+      File outputSegmentFile = new File(localOutputTempDirectory, segmentName 
+ ".tar.gz");
+      outputDirFS.copyFromLocalFile(outputSegmentFile, outputDirURI);
+
+      FileUtils.deleteQuietly(outputSegmentFile);
+      FileUtils.deleteQuietly(inputDataFile);
+    }
+    //clean up
+    FileUtils.deleteDirectory(tempDirectory);
+  }
+}
diff --git 
a/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
new file mode 100644
index 0000000..b1d3741
--- /dev/null
+++ 
b/pinot-ingestion-jobs/pinot-standalone/src/main/java/org/apache/pinot/ingestion/standalone/StandaloneIngestionJobLauncher.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.standalone;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import org.apache.pinot.ingestion.common.SegmentGenerationJobSpec;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class StandaloneIngestionJobLauncher {
+
+
+  public static void main(String[] args) throws Exception{
+    String jobSpecFilePath = args[0];
+
+    try(Reader reader = new BufferedReader(new FileReader(jobSpecFilePath))) {
+
+      Yaml yaml = new Yaml();
+      SegmentGenerationJobSpec spec = yaml.loadAs(reader, 
SegmentGenerationJobSpec.class);
+      StringWriter sw = new StringWriter();
+      yaml.dump(spec, sw);
+      System.out.println("dump = " + sw.toString());
+
+    }
+  }
+}
diff --git a/pinot-ingestion-jobs/pom.xml b/pinot-ingestion-jobs/pom.xml
index b89a8f6..d9cffb3 100644
--- a/pinot-ingestion-jobs/pom.xml
+++ b/pinot-ingestion-jobs/pom.xml
@@ -41,6 +41,7 @@
     <module>pinot-ingestion-common</module>
     <module>pinot-hadoop</module>
     <module>pinot-spark</module>
+    <module>pinot-standalone</module>
   </modules>
 
   <dependencies>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to