This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch pinot-batch-ingestion-hadoop
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 8cbf1d2298787305817c8ebd8776703db5d949b8
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jan 15 04:29:23 2020 -0800

    Adding pinot-batch-ingestion-hadoop module
---
 .../pinot-batch-ingestion-hadoop/pom.xml           | 132 ++++++++++
 .../hadoop/HadoopSegmentGenerationJobRunner.java   | 285 +++++++++++++++++++++
 .../hadoop/HadoopSegmentTarPushJobRunner.java      | 107 ++++++++
 .../hadoop/HadoopSegmentUriPushJobRunner.java      | 107 ++++++++
 .../batch/hadoop/SegmentCreationMapper.java        | 205 +++++++++++++++
 .../segmentCreationAndTarPushJobSpec.yaml          |  45 ++++
 .../segmentCreationAndUriPushJobSpec.yaml          |  45 ++++
 .../src/main/resources/segmentCreationJobSpec.yaml |  42 +++
 .../src/main/resources/segmentTarPushJobSpec.yaml  |  45 ++++
 .../src/main/resources/segmentUriPushJobSpec.yaml  |  45 ++++
 pinot-plugins/pinot-batch-ingestion/pom.xml        |   1 +
 11 files changed, 1059 insertions(+)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pom.xml
new file mode 100644
index 0000000..3f0f5cd
--- /dev/null
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/pom.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>pinot-batch-ingestion</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.3.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>pinot-batch-ingestion-hadoop</artifactId>
+  <name>Pinot Batch Ingestion for Hadoop</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <hadoop.version>2.7.0</hadoop.version>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-batch-ingestion-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.yaml</groupId>
+      <artifactId>snakeyaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-lang</groupId>
+          <artifactId>commons-lang</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-math3</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math3</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>hadoop2</classifier>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.5</version>
+        <configuration>
+          <forceCreation>true</forceCreation>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
new file mode 100644
index 0000000..d773e2f
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.hadoop;
+
+import com.google.common.base.Preconditions;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+
+public class HadoopSegmentGenerationJobRunner extends Configured implements 
IngestionJobRunner, Serializable {
+
+  public static final String SEGMENT_GENERATION_JOB_SPEC = 
"segmentGenerationJobSpec";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopSegmentGenerationJobRunner.class);
+  private static final String DEPS_JAR_DIR = "dependencyJarDir";
+  private static final String STAGING_DIR = "stagingDir";
+  private SegmentGenerationJobSpec _spec;
+
+  public HadoopSegmentGenerationJobRunner() {
+  }
+
+  public HadoopSegmentGenerationJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
+    _spec = spec;
+    if (_spec.getInputDirURI() == null) {
+      throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' 
file");
+    }
+    if (_spec.getOutputDirURI() == null) {
+      throw new RuntimeException("Missing property 'outputDirURI' in 'jobSpec' 
file");
+    }
+    if (_spec.getRecordReaderSpec() == null) {
+      throw new RuntimeException("Missing property 'recordReaderSpec' in 
'jobSpec' file");
+    }
+    if (_spec.getTableSpec() == null) {
+      throw new RuntimeException("Missing property 'tableSpec' in 'jobSpec' 
file");
+    }
+    if (_spec.getTableSpec().getTableName() == null) {
+      throw new RuntimeException("Missing property 'tableName' in 
'tableSpec'");
+    }
+    if (_spec.getTableSpec().getSchemaURI() == null) {
+      if (_spec.getPinotClusterSpecs() == null || 
_spec.getPinotClusterSpecs().length == 0) {
+        throw new RuntimeException("Missing property 'schemaURI' in 
'tableSpec'");
+      }
+      PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
+      String schemaURI = SegmentGenerationUtils
+          .generateSchemaURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
+      _spec.getTableSpec().setSchemaURI(schemaURI);
+    }
+    if (_spec.getTableSpec().getTableConfigURI() == null) {
+      if (_spec.getPinotClusterSpecs() == null || 
_spec.getPinotClusterSpecs().length == 0) {
+        throw new RuntimeException("Missing property 'tableConfigURI' in 
'tableSpec'");
+      }
+      PinotClusterSpec pinotClusterSpec = _spec.getPinotClusterSpecs()[0];
+      String tableConfigURI = SegmentGenerationUtils
+          .generateTableConfigURI(pinotClusterSpec.getControllerURI(), 
_spec.getTableSpec().getTableName());
+      _spec.getTableSpec().setTableConfigURI(tableConfigURI);
+    }
+    if (_spec.getExecutionFrameworkSpec().getExtraConfigs() == null) {
+      _spec.getExecutionFrameworkSpec().setExtraConfigs(new HashMap<>());
+    }
+  }
+
+  @Override
+  public void run()
+      throws Exception {
+    //init all file systems
+    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+      Configuration config = new MapConfiguration(pinotFSSpec.getConfigs());
+      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), config);
+    }
+
+    //Get pinotFS for input
+    URI inputDirURI = new URI(_spec.getInputDirURI());
+    if (inputDirURI.getScheme() == null) {
+      inputDirURI = new File(_spec.getInputDirURI()).toURI();
+    }
+    PinotFS inputDirFS = PinotFSFactory.create(inputDirURI.getScheme());
+
+    //Get outputFS for writing output pinot segments
+    URI outputDirURI = new URI(_spec.getOutputDirURI());
+    if (outputDirURI.getScheme() == null) {
+      outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+    }
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+    outputDirFS.mkdir(outputDirURI);
+
+    //Get staging directory for temporary output pinot segments
+    String stagingDir = 
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
+    Preconditions.checkNotNull(stagingDir, "Please set config: stagingDir 
under 'executionFrameworkSpec.extraConfigs'");
+    URI stagingDirURI = URI.create(stagingDir);
+    if (stagingDirURI.getScheme() == null) {
+      stagingDirURI = new File(stagingDir).toURI();
+    }
+    Path stagingInputDir = new Path(stagingDirURI.toString(), "input");
+    if (!outputDirURI.getScheme().equals(stagingDirURI.getScheme())) {
+      throw new RuntimeException(String
+          .format("The scheme of staging directory URI [%s] and output 
directory URI [%s] has to be same.",
+              stagingDirURI, outputDirURI));
+    }
+    outputDirFS.mkdir(stagingDirURI);
+
+    //Get list of files to process
+    String[] files = inputDirFS.listFiles(inputDirURI, true);
+
+    //TODO: sort input files based on creation time
+    List<String> filteredFiles = new ArrayList<>();
+    PathMatcher includeFilePathMatcher = null;
+    if (_spec.getIncludeFileNamePattern() != null) {
+      includeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getIncludeFileNamePattern());
+    }
+    PathMatcher excludeFilePathMatcher = null;
+    if (_spec.getExcludeFileNamePattern() != null) {
+      excludeFilePathMatcher = 
FileSystems.getDefault().getPathMatcher(_spec.getExcludeFileNamePattern());
+    }
+
+    for (String file : files) {
+      if (includeFilePathMatcher != null) {
+        if (!includeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (excludeFilePathMatcher != null) {
+        if (excludeFilePathMatcher.matches(Paths.get(file))) {
+          continue;
+        }
+      }
+      if (!inputDirFS.isDirectory(new URI(file))) {
+        filteredFiles.add(file);
+      }
+    }
+
+    int numDataFiles = filteredFiles.size();
+    if (numDataFiles == 0) {
+      String errorMessage = String
+          .format("No data file founded in [%s], with include file pattern: 
[%s] and exclude file  pattern [%s]",
+              _spec.getInputDirURI(), _spec.getIncludeFileNamePattern(), 
_spec.getExcludeFileNamePattern());
+      LOGGER.error(errorMessage);
+      throw new RuntimeException(errorMessage);
+    } else {
+      LOGGER.info("Creating segments with data files: {}", filteredFiles);
+      for (int i = 0; i < numDataFiles; i++) {
+        String dataFilePath = filteredFiles.get(i);
+
+        File localFile = new File("tmp");
+        try (DataOutputStream dataOutputStream = new DataOutputStream(new 
FileOutputStream(localFile))) {
+          dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath + " " + 
i));
+          dataOutputStream.flush();
+          outputDirFS.copyFromLocalFile(localFile, new Path(stagingInputDir, 
Integer.toString(i)).toUri());
+        }
+      }
+    }
+
+    try {
+      // Set up the job
+      Job job = Job.getInstance(getConf());
+      job.setJarByClass(getClass());
+      job.setJobName(getClass().getName());
+
+      org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration();
+      String hadoopTokenFileLocation = 
System.getenv("HADOOP_TOKEN_FILE_LOCATION");
+      if (hadoopTokenFileLocation != null) {
+        jobConf.set("mapreduce.job.credentials.binary", 
hadoopTokenFileLocation);
+      }
+      jobConf.setInt(JobContext.NUM_MAPS, numDataFiles);
+
+      // Add dependency jars
+      if 
(_spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) 
{
+        addDepsJarToDistributedCache(job, 
_spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
+      }
+
+      jobConf.set(SEGMENT_GENERATION_JOB_SPEC, new Yaml().dump(_spec));
+
+      job.setMapperClass(getMapperClass());
+      job.setNumReduceTasks(0);
+
+      job.setInputFormatClass(TextInputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
+
+      job.setMapOutputKeyClass(LongWritable.class);
+      job.setMapOutputValueClass(Text.class);
+
+      FileInputFormat.addInputPath(job, stagingInputDir);
+      FileOutputFormat.setOutputPath(job, new Path(stagingDir, "output"));
+
+      // Submit the job
+      job.waitForCompletion(true);
+      if (!job.isSuccessful()) {
+        throw new RuntimeException("Job failed: " + job);
+      }
+
+      if (stagingDirURI != null) {
+        LOGGER.info("Trying to copy segment tars from staging directory: [{}] 
to output directory [{}]", stagingDirURI,
+            outputDirURI);
+        outputDirFS.copy(stagingDirURI, outputDirURI);
+      }
+    } finally {
+      if (stagingDirURI != null) {
+        LOGGER.info("Trying to clean up staging directory: [{}]", 
stagingDirURI);
+        outputDirFS.delete(stagingDirURI, true);
+      }
+    }
+  }
+
+  /**
+   * Can be overridden to plug in custom mapper.
+   */
+  protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> 
getMapperClass() {
+    return SegmentCreationMapper.class;
+  }
+
+  protected void addDepsJarToDistributedCache(Job job, String depsJarDir)
+      throws IOException {
+    if (depsJarDir != null) {
+      URI depsJarDirURI = URI.create(depsJarDir);
+      if (depsJarDirURI.getScheme() == null) {
+        depsJarDirURI = new File(depsJarDir).toURI();
+      }
+      PinotFS pinotFS = PinotFSFactory.create(depsJarDirURI.getScheme());
+      String[] files = pinotFS.listFiles(depsJarDirURI, true);
+      for (String file : files) {
+        URI fileURI = URI.create(file);
+        if (!pinotFS.isDirectory(fileURI)) {
+          if (file.endsWith(".jar")) {
+            LOGGER.info("Adding deps jar: {} to distributed cache", file);
+            job.addCacheArchive(fileURI);
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
new file mode 100644
index 0000000..f687b60
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentTarPushJobRunner.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopSegmentTarPushJobRunner implements IngestionJobRunner, 
Serializable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopSegmentTarPushJobRunner.class);
+
+  private SegmentGenerationJobSpec _spec;
+
+  public HadoopSegmentTarPushJobRunner() {
+  }
+
+  public HadoopSegmentTarPushJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
+    _spec = spec;
+  }
+
+  @Override
+  public void run() {
+    //init all file systems
+    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+      Configuration config = new MapConfiguration(pinotFSSpec.getConfigs());
+      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), config);
+    }
+
+    //Get outputFS for writing output pinot segments
+    URI outputDirURI;
+    try {
+      outputDirURI = new URI(_spec.getOutputDirURI());
+      if (outputDirURI.getScheme() == null) {
+        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+      }
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
+    }
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+    //Get list of files to process
+    String[] files;
+    try {
+      files = outputDirFS.listFiles(outputDirURI, true);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
+    }
+
+    List<String> segmentsToPush = new ArrayList<>();
+    for (String file : files) {
+      if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        segmentsToPush.add(file);
+      }
+    }
+
+    int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
+    if (pushParallelism < 1) {
+      pushParallelism = segmentsToPush.size();
+    }
+    // Push from driver
+    try {
+      SegmentPushUtils.pushSegments(_spec, outputDirFS, segmentsToPush);
+    } catch (RetriableOperationException | AttemptsExceededException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
new file mode 100644
index 0000000..8358a0a
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopSegmentUriPushJobRunner implements IngestionJobRunner, 
Serializable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopSegmentUriPushJobRunner.class);
+
+  private SegmentGenerationJobSpec _spec;
+
+  public HadoopSegmentUriPushJobRunner() {
+  }
+
+  public HadoopSegmentUriPushJobRunner(SegmentGenerationJobSpec spec) {
+    init(spec);
+  }
+
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
+    _spec = spec;
+    if (_spec.getPushJobSpec() == null) {
+      throw new RuntimeException("Missing PushJobSpec");
+    }
+  }
+
+  @Override
+  public void run() {
+    //init all file systems
+    List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
+    for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
+      Configuration config = new MapConfiguration(pinotFSSpec.getConfigs());
+      PinotFSFactory.register(pinotFSSpec.getScheme(), 
pinotFSSpec.getClassName(), config);
+    }
+
+    //Get outputFS for writing output Pinot segments
+    URI outputDirURI;
+    try {
+      outputDirURI = new URI(_spec.getOutputDirURI());
+      if (outputDirURI.getScheme() == null) {
+        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+      }
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("outputDirURI is not valid - '" + 
_spec.getOutputDirURI() + "'");
+    }
+    PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+
+    //Get list of files to process
+    String[] files;
+    try {
+      files = outputDirFS.listFiles(outputDirURI, true);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to list all files under outputDirURI 
- '" + outputDirURI + "'");
+    }
+    List<String> segmentUris = new ArrayList<>();
+    for (String file : files) {
+      URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
+        segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + 
uri.getRawPath() + _spec.getPushJobSpec()
+            .getSegmentUriSuffix());
+      }
+    }
+    // Push from driver
+    try {
+      SegmentPushUtils.sendSegmentUris(_spec, segmentUris);
+    } catch (RetriableOperationException | AttemptsExceededException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/SegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/SegmentCreationMapper.java
new file mode 100644
index 0000000..de98972
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/SegmentCreationMapper.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.ingestion.batch.hadoop;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
+import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
+import org.apache.pinot.spi.utils.DataSize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import static 
org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner.SEGMENT_GENERATION_JOB_SPEC;
+
+
+public class SegmentCreationMapper extends Mapper<LongWritable, Text, 
LongWritable, Text> {
+  protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
+  protected static final String PROGRESS_REPORTER_THREAD_NAME = 
"pinot-hadoop-progress-reporter";
+  protected static final String SEGMENT_TAR_DIR = "segmentTar";
+  protected static final long PROGRESS_REPORTER_JOIN_WAIT_TIME_MS = 5_000L;
+
+  protected final Logger LOGGER = 
LoggerFactory.getLogger(SegmentCreationMapper.class);
+
+  protected Configuration _jobConf;
+  protected SegmentGenerationJobSpec _spec;
+  private File _localTempDir;
+
+  @Override
+  public void setup(Context context)
+      throws IOException, InterruptedException {
+    _jobConf = context.getConfiguration();
+    Yaml yaml = new Yaml();
+    String segmentGenerationJobSpecStr = 
_jobConf.get(SEGMENT_GENERATION_JOB_SPEC);
+    _spec = yaml.loadAs(segmentGenerationJobSpecStr, 
SegmentGenerationJobSpec.class);
+    LOGGER.info("Segment generation job spec : {}", 
segmentGenerationJobSpecStr);
+    _localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + 
System.currentTimeMillis());
+  }
+
+  @Override
+  protected void map(LongWritable key, Text value, Context context)
+      throws IOException, InterruptedException {
+    try {
+      String[] splits = StringUtils.split(value.toString(), ' ');
+      Preconditions.checkState(splits.length == 2, "Illegal input value: {}", 
value);
+
+      String path = splits[0];
+      int idx = Integer.valueOf(splits[1]);
+      LOGGER.info("Generating segment with input file: {}, sequence id: {}", 
path, idx);
+
+      URI inputDirURI = new URI(_spec.getInputDirURI());
+      if (inputDirURI.getScheme() == null) {
+        inputDirURI = new File(_spec.getInputDirURI()).toURI();
+      }
+
+      URI outputDirURI = new URI(_spec.getOutputDirURI());
+      if (outputDirURI.getScheme() == null) {
+        outputDirURI = new File(_spec.getOutputDirURI()).toURI();
+      }
+      PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
+
+      URI inputFileURI = URI.create(path);
+      if (inputFileURI.getScheme() == null) {
+        inputFileURI =
+            new URI(inputDirURI.getScheme(), 
inputFileURI.getSchemeSpecificPart(), inputFileURI.getFragment());
+      }
+
+      //create localTempDir for input and output
+      File localInputTempDir = new File(_localTempDir, "input");
+      FileUtils.forceMkdir(localInputTempDir);
+      File localOutputTempDir = new File(_localTempDir, "output");
+      FileUtils.forceMkdir(localOutputTempDir);
+
+      //copy input path to local
+      File localInputDataFile = new File(localInputTempDir, new 
File(inputFileURI).getName());
+      
PinotFSFactory.create(inputFileURI.getScheme()).copyToLocalFile(inputFileURI, 
localInputDataFile);
+
+      //create task spec
+      SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
+      taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
+      taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
+      taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
+      
taskSpec.setSchema(SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI()));
+      taskSpec
+          
.setTableConfig(SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI()).toJsonNode());
+      taskSpec.setSequenceId(idx);
+      
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
+
+      // Start a thread that reports progress every minute during segment 
generation to prevent job getting killed
+      Thread progressReporterThread = new Thread(getProgressReporter(context));
+      progressReporterThread.setName(PROGRESS_REPORTER_THREAD_NAME);
+      progressReporterThread.start();
+      String segmentName;
+      try {
+        SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
+        segmentName = taskRunner.run();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while creating segment with input file: 
{}, sequence id: {}", path, idx, e);
+        throw new RuntimeException(e);
+      } finally {
+        progressReporterThread.interrupt();
+        progressReporterThread.join(PROGRESS_REPORTER_JOIN_WAIT_TIME_MS);
+        if (progressReporterThread.isAlive()) {
+          LOGGER.error("Failed to interrupt progress reporter thread: {}", 
progressReporterThread);
+        }
+      }
+
+      // Tar segment directory to compress file
+      File localSegmentDir = new File(localOutputTempDir, segmentName);
+      String segmentTarFileName = segmentName + Constants.TAR_GZ_FILE_EXT;
+      File localSegmentTarFile = new File(localOutputTempDir, 
segmentTarFileName);
+      LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, 
localSegmentTarFile);
+      TarGzCompressionUtils.createTarGzOfDirectory(localSegmentDir.getPath(), 
localSegmentTarFile.getPath());
+      long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
+      long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
+      LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", 
segmentName,
+          DataSize.fromBytes(uncompressedSegmentSize), 
DataSize.fromBytes(compressedSegmentSize));
+      //move segment to output PinotFS
+      URI outputSegmentTarURI = 
SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, 
outputDirURI)
+          .resolve(segmentTarFileName);
+      LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", 
localSegmentTarFile, outputSegmentTarURI);
+      if (!_spec.isOverwriteOutput() && 
outputDirFS.exists(outputSegmentTarURI)) {
+        LOGGER.warn("Not overwrite existing output segment tar file: {}", 
outputDirFS.exists(outputSegmentTarURI));
+      } else {
+        outputDirFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
+      }
+      FileUtils.deleteQuietly(localSegmentDir);
+      FileUtils.deleteQuietly(localSegmentTarFile);
+      FileUtils.deleteQuietly(localInputDataFile);
+
+      context.write(new LongWritable(idx), new Text(segmentTarFileName));
+      LOGGER.info("Finish generating segment: {} with input file: {}, sequence 
id: {}", segmentName, inputFileURI, idx);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      FileUtils.deleteQuietly(_localTempDir);
+    }
+  }
+
+  protected Runnable getProgressReporter(Context context) {
+    return new ProgressReporter(context);
+  }
+
+  @Override
+  public void cleanup(Context context) {
+    LOGGER.info("Deleting local temporary directory: {}", _localTempDir);
+    FileUtils.deleteQuietly(_localTempDir);
+  }
+
+  private static class ProgressReporter implements Runnable {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProgressReporter.class);
+    private static final long PROGRESS_REPORTER_INTERVAL_MS = 60_000L;
+
+    private final Context _context;
+
+    ProgressReporter(Context context) {
+      _context = context;
+    }
+
+    @Override
+    public void run() {
+      LOGGER.info("Starting progress reporter thread: {}", 
Thread.currentThread());
+      while (true) {
+        try {
+          Thread.sleep(PROGRESS_REPORTER_INTERVAL_MS);
+          LOGGER.info("============== Reporting progress ==============");
+          _context.progress();
+        } catch (InterruptedException e) {
+          LOGGER.info("Progress reporter thread: {} interrupted", 
Thread.currentThread());
+          return;
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
new file mode 100644
index 0000000..b8fbf1a
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndTarPushJobSpec.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'hadoop'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
+jobType: SegmentCreationAndTarPush
+inputDirURI: 'file:///path/to/input'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  pushAttempts: 2
+  pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
new file mode 100644
index 0000000..f0e02e1
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationAndUriPushJobSpec.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'hadoop'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
+jobType: SegmentCreationAndUriPush
+inputDirURI: 'file:///path/to/input'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  segmentUriPrefix: 'file://'
+  segmentUriSuffix: ''
\ No newline at end of file
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationJobSpec.yaml
new file mode 100644
index 0000000..a8b667b
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentCreationJobSpec.yaml
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'hadoop'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
+jobType: SegmentCreation
+inputDirURI: 'file:///path/to/input'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentTarPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentTarPushJobSpec.yaml
new file mode 100644
index 0000000..7ce85af
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentTarPushJobSpec.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'hadoop'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
+jobType: SegmentTarPush
+inputDirURI: 'file:///path/to/input'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  pushAttempts: 2
+  pushRetryIntervalMillis: 1000
\ No newline at end of file
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentUriPushJobSpec.yaml
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentUriPushJobSpec.yaml
new file mode 100644
index 0000000..cf65805
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/resources/segmentUriPushJobSpec.yaml
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+executionFrameworkSpec:
+  name: 'hadoop'
+  segmentGenerationJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunner'
+  segmentTarPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentTarPushJobRunner'
+  segmentUriPushJobRunnerClassName: 
'org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentUriPushJobRunner'
+jobType: SegmentUriPush
+inputDirURI: 'file:///path/to/input'
+includeFileNamePattern: 'glob:**/*.parquet'
+excludeFileNamePattern: 'glob:**/*.avro'
+outputDirURI: 'file:///path/to/output'
+overwriteOutput: true
+pinotFSSpecs:
+  - scheme: file
+    className: org.apache.pinot.spi.filesystem.LocalPinotFS
+recordReaderSpec:
+  dataFormat: 'parquet'
+  className: 'org.apache.pinot.parquet.data.readers.ParquetRecordReader'
+tableSpec:
+  tableName: 'myTable'
+  schemaURI: 'http://localhost:9000/tables/myTable/schema'
+  tableConfigURI: 'http://localhost:9000/tables/myTable'
+pinotClusterSpecs:
+  - controllerURI: 'localhost:9000'
+pushJobSpec:
+  segmentUriPrefix: 'file://'
+  segmentUriSuffix: ''
\ No newline at end of file
diff --git a/pinot-plugins/pinot-batch-ingestion/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/pom.xml
index 9f0c00b..92b2e93 100644
--- a/pinot-plugins/pinot-batch-ingestion/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/pom.xml
@@ -40,6 +40,7 @@
   <modules>
     <module>pinot-batch-ingestion-common</module>
     <module>pinot-batch-ingestion-spark</module>
+    <module>pinot-batch-ingestion-hadoop</module>
     <module>pinot-batch-ingestion-standalone</module>
     <module>v0_deprecated</module>
   </modules>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to