This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch support-spark-preprocessing
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6231d1a22d7b8e665c73aac90f02d3e9ab8099c8
Author: Jack Li(Analytics Engineering) <[email protected]>
AuthorDate: Thu Jul 22 16:05:03 2021 -0700

    Support data preprocessing in Spark framework
---
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  | 166 +--------------
 .../hadoop/job/mappers/SegmentCreationMapper.java  |   2 +-
 .../job/preprocess/DataPreprocessingHelper.java    | 228 ---------------------
 .../HadoopAvroDataPreprocessingHelper.java         |  49 +++++
 .../preprocess/HadoopDataPreprocessingHelper.java  | 149 ++++++++++++++
 .../HadoopDataPreprocessingHelperFactory.java      |  39 ++++
 .../hadoop/job/preprocess/HadoopJobPreparer.java   |  10 +
 .../HadoopOrcDataPreprocessingHelper.java          |  48 +++++
 .../v0_deprecated/pinot-ingestion-common/pom.xml   |   8 +
 .../ingestion/jobs/SegmentPreprocessingJob.java    | 172 +++++++++++++++-
 .../preprocess/AvroDataPreprocessingHelper.java    |  39 ++--
 .../preprocess/DataPreprocessingHelper.java        | 118 +++++++++++
 .../preprocess/DataPreprocessingHelperFactory.java |   4 +-
 .../preprocess/OrcDataPreprocessingHelper.java     |  39 ++--
 .../preprocess/SampleTimeColumnExtractable.java    |   9 +
 .../mappers/AvroDataPreprocessingMapper.java       |   6 +-
 .../mappers/OrcDataPreprocessingMapper.java        |   8 +-
 .../mappers/SegmentPreprocessingMapper.java        |   4 +-
 .../AvroDataPreprocessingPartitioner.java          |   4 +-
 .../partitioners/GenericPartitioner.java           |   8 +-
 .../OrcDataPreprocessingPartitioner.java           |   6 +-
 .../partitioners/PartitionFunctionFactory.java     |   2 +-
 .../reducers/AvroDataPreprocessingReducer.java     |   4 +-
 .../reducers/OrcDataPreprocessingReducer.java      |   4 +-
 .../ingestion/utils}/DataPreprocessingUtils.java   |   2 +-
 .../ingestion/utils}/InternalConfigConstants.java  |   2 +-
 .../ingestion}/utils/preprocess/DataFileUtils.java |   2 +-
 .../ingestion}/utils/preprocess/HadoopUtils.java   |   2 +-
 .../ingestion}/utils/preprocess/OrcUtils.java      |   2 +-
 .../utils/preprocess/TextComparator.java           |   2 +-
 .../spark/jobs/SparkSegmentPreprocessingJob.java   |  89 ++++++++
 .../SparkAvroDataPreprocessingHelper.java          |  39 ++++
 .../preprocess/SparkDataPreprocessingHelper.java   |  27 +++
 .../SparkDataPreprocessingHelperFactory.java       |  39 ++++
 .../SparkOrcDataPreprocessingHelper.java           |  16 ++
 .../org/apache/pinot/spark/utils}/HadoopUtils.java |   2 +-
 36 files changed, 871 insertions(+), 479 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 81a9902..9707487 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -18,33 +18,19 @@
  */
 package org.apache.pinot.hadoop.job;
 
-import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelper;
-import org.apache.pinot.hadoop.job.preprocess.DataPreprocessingHelperFactory;
+import 
org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelperFactory;
+import org.apache.pinot.hadoop.job.preprocess.HadoopDataPreprocessingHelper;
 import org.apache.pinot.hadoop.utils.PinotHadoopJobPreparationHelper;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableCustomConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,21 +45,6 @@ import org.slf4j.LoggerFactory;
 public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopSegmentPreprocessingJob.class);
 
-  private String _partitionColumn;
-  private int _numPartitions;
-  private String _partitionFunction;
-
-  private String _sortingColumn;
-  private FieldSpec.DataType _sortingColumnType;
-
-  private int _numOutputFiles;
-  private int _maxNumRecordsPerFile;
-
-  private TableConfig _tableConfig;
-  private org.apache.pinot.spi.data.Schema _pinotTableSchema;
-
-  private Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
-
   public HadoopSegmentPreprocessingJob(final Properties properties) {
     super(properties);
   }
@@ -96,8 +67,8 @@ public class HadoopSegmentPreprocessingJob extends 
SegmentPreprocessingJob {
     // Cleans up preprocessed output dir if exists
     cleanUpPreprocessedOutputs(_preprocessedOutputDir);
 
-    DataPreprocessingHelper dataPreprocessingHelper =
-        
DataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir,
 _preprocessedOutputDir);
+    HadoopDataPreprocessingHelper dataPreprocessingHelper =
+        
HadoopDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir,
 _preprocessedOutputDir);
     dataPreprocessingHelper
         .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, 
_numPartitions, _partitionFunction,
             _sortingColumn, _sortingColumnType, _numOutputFiles, 
_maxNumRecordsPerFile);
@@ -126,124 +97,6 @@ public class HadoopSegmentPreprocessingJob extends 
SegmentPreprocessingJob {
     LOGGER.info("Finished pre-processing job in {}ms", 
(System.currentTimeMillis() - startTime));
   }
 
-  private void fetchPreProcessingOperations() {
-    _preprocessingOperations = new HashSet<>();
-    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
-    if (customConfig != null) {
-      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
-      if (customConfigMap != null && !customConfigMap.isEmpty()) {
-        String preprocessingOperationsString =
-            
customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
-        DataPreprocessingUtils.getOperations(_preprocessingOperations, 
preprocessingOperationsString);
-      }
-    }
-  }
-
-  private void fetchPartitioningConfig() {
-    // Fetch partition info from table config.
-    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION))
 {
-      LOGGER.info("Partitioning is disabled.");
-      return;
-    }
-    SegmentPartitionConfig segmentPartitionConfig = 
_tableConfig.getIndexingConfig().getSegmentPartitionConfig();
-    if (segmentPartitionConfig != null) {
-      Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
-      Preconditions
-          .checkArgument(columnPartitionMap.size() <= 1, "There should be at 
most 1 partition setting in the table.");
-      if (columnPartitionMap.size() == 1) {
-        _partitionColumn = columnPartitionMap.keySet().iterator().next();
-        _numPartitions = 
segmentPartitionConfig.getNumPartitions(_partitionColumn);
-        _partitionFunction = 
segmentPartitionConfig.getFunctionName(_partitionColumn);
-      }
-    } else {
-      LOGGER.info("Segment partition config is null for table: {}", 
_tableConfig.getTableName());
-    }
-  }
-
-  private void fetchSortingConfig() {
-    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
-      LOGGER.info("Sorting is disabled.");
-      return;
-    }
-    // Fetch sorting info from table config first.
-    List<String> sortingColumns = new ArrayList<>();
-    List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
-    if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
-      for (FieldConfig fieldConfig : fieldConfigs) {
-        if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
-          sortingColumns.add(fieldConfig.getName());
-        }
-      }
-    }
-    if (!sortingColumns.isEmpty()) {
-      Preconditions.checkArgument(sortingColumns.size() == 1, "There should be 
at most 1 sorted column in the table.");
-      _sortingColumn = sortingColumns.get(0);
-      return;
-    }
-
-    // There is no sorted column specified in field configs, try to find 
sorted column from indexing config.
-    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
-    List<String> sortedColumns = indexingConfig.getSortedColumn();
-    if (sortedColumns != null) {
-      Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be 
at most 1 sorted column in the table.");
-      if (sortedColumns.size() == 1) {
-        _sortingColumn = sortedColumns.get(0);
-        FieldSpec fieldSpec = 
_pinotTableSchema.getFieldSpecFor(_sortingColumn);
-        Preconditions.checkState(fieldSpec != null, "Failed to find sorting 
column: {} in the schema", _sortingColumn);
-        Preconditions
-            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on 
multi-value column: %s", _sortingColumn);
-        _sortingColumnType = fieldSpec.getDataType();
-        Preconditions
-            .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort 
on %s column: %s", _sortingColumnType,
-                _sortingColumn);
-        LOGGER.info("Sorting the data with column: {} of type: {}", 
_sortingColumn, _sortingColumnType);
-      }
-    }
-  }
-
-  private void fetchResizingConfig() {
-    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
-      LOGGER.info("Resizing is disabled.");
-      return;
-    }
-    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
-    if (tableCustomConfig == null) {
-      _numOutputFiles = 0;
-      return;
-    }
-    Map<String, String> customConfigsMap = 
tableCustomConfig.getCustomConfigs();
-    if (customConfigsMap != null && 
customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS))
 {
-      _numOutputFiles = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
-      Preconditions.checkState(_numOutputFiles > 0, String
-          .format("The value of %s should be positive! Current value: %s",
-              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, 
_numOutputFiles));
-    } else {
-      _numOutputFiles = 0;
-    }
-
-    if (customConfigsMap != null) {
-      int maxNumRecords;
-      if 
(customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE))
 {
-        LOGGER.warn("The config: {} from custom config is deprecated. Use {} 
instead.",
-            InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
-            InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
-        maxNumRecords = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
-      } else if 
(customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE))
 {
-        maxNumRecords =
-            
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
-      } else {
-        return;
-      }
-      // TODO: add a in-built maximum value for this config to avoid having 
too many small files.
-      // E.g. if the config is set to 1 which is smaller than this in-built 
value, the job should be abort from generating too many small files.
-      Preconditions.checkArgument(maxNumRecords > 0,
-          "The value of " + 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
-              + " should be positive. Current value: " + maxNumRecords);
-      LOGGER.info("Setting {} to {}", 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
-      _maxNumRecordsPerFile = maxNumRecords;
-    }
-  }
-
   @Override
   protected Schema getSchema()
       throws IOException {
@@ -265,15 +118,6 @@ public class HadoopSegmentPreprocessingJob extends 
SegmentPreprocessingJob {
   protected void addAdditionalJobProperties(Job job) {
   }
 
-  private void setTableConfigAndSchema()
-      throws IOException {
-    _tableConfig = getTableConfig();
-    _pinotTableSchema = getSchema();
-
-    Preconditions.checkState(_tableConfig != null, "Table config cannot be 
null.");
-    Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be 
null");
-  }
-
   /**
    * Cleans up outputs in preprocessed output directory.
    */
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 6927d5d..14955c9 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.ingestion.jobs.SegmentCreationJob;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
deleted file mode 100644
index a505d09..0000000
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelper.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.hadoop.job.preprocess;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.job.partitioners.GenericPartitioner;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
-import org.apache.pinot.hadoop.utils.preprocess.TextComparator;
-import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public abstract class DataPreprocessingHelper {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataPreprocessingHelper.class);
-
-  String _partitionColumn;
-  int _numPartitions;
-  String _partitionFunction;
-
-  String _sortingColumn;
-  private FieldSpec.DataType _sortingColumnType;
-
-  private int _numOutputFiles;
-  private int _maxNumRecordsPerFile;
-
-  private TableConfig _tableConfig;
-  private Schema _pinotTableSchema;
-
-  List<Path> _inputDataPaths;
-  Path _sampleRawDataPath;
-  Path _outputPath;
-
-  public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
-    _inputDataPaths = inputDataPaths;
-    _sampleRawDataPath = inputDataPaths.get(0);
-    _outputPath = outputPath;
-  }
-
-  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
-      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
-      int maxNumRecordsPerFile) {
-    _tableConfig = tableConfig;
-    _pinotTableSchema = tableSchema;
-    _partitionColumn = partitionColumn;
-    _numPartitions = numPartitions;
-    _partitionFunction = partitionFunction;
-
-    _sortingColumn = sortingColumn;
-    _sortingColumnType = sortingColumnType;
-
-    _numOutputFiles = numOutputFiles;
-    _maxNumRecordsPerFile = maxNumRecordsPerFile;
-  }
-
-  public Job setUpJob()
-      throws IOException {
-    LOGGER.info("Initializing a pre-processing job");
-    Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
-    Configuration jobConf = job.getConfiguration();
-    // Input and output paths.
-    int numInputPaths = _inputDataPaths.size();
-    jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
-    setValidationConfigs(job, _sampleRawDataPath);
-    for (Path inputFile : _inputDataPaths) {
-      FileInputFormat.addInputPath(job, inputFile);
-    }
-    setHadoopJobConfigs(job);
-
-    // Sorting column
-    if (_sortingColumn != null) {
-      LOGGER.info("Adding sorting column: {} to job config", _sortingColumn);
-      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, 
_sortingColumn);
-      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, 
_sortingColumnType.name());
-
-      switch (_sortingColumnType) {
-        case INT:
-          job.setMapOutputKeyClass(IntWritable.class);
-          break;
-        case LONG:
-          job.setMapOutputKeyClass(LongWritable.class);
-          break;
-        case FLOAT:
-          job.setMapOutputKeyClass(FloatWritable.class);
-          break;
-        case DOUBLE:
-          job.setMapOutputKeyClass(DoubleWritable.class);
-          break;
-        case STRING:
-          job.setMapOutputKeyClass(Text.class);
-          job.setSortComparatorClass(TextComparator.class);
-          break;
-        default:
-          throw new IllegalStateException();
-      }
-    } else {
-      job.setMapOutputKeyClass(NullWritable.class);
-    }
-
-    // Partition column
-    int numReduceTasks = 0;
-    if (_partitionColumn != null) {
-      numReduceTasks = _numPartitions;
-      jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
-      job.setPartitionerClass(GenericPartitioner.class);
-      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, 
_partitionColumn);
-      if (_partitionFunction != null) {
-        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, 
_partitionFunction);
-      }
-      jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, 
numReduceTasks);
-      job.setPartitionerClass(getPartitioner());
-    } else {
-      if (_numOutputFiles > 0) {
-        numReduceTasks = _numOutputFiles;
-      } else {
-        // default number of input paths
-        numReduceTasks = _inputDataPaths.size();
-      }
-    }
-    // Maximum number of records per output file
-    jobConf
-        .set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, 
Integer.toString(_maxNumRecordsPerFile));
-    // Number of reducers
-    LOGGER.info("Number of reduce tasks for pre-processing job: {}", 
numReduceTasks);
-    job.setNumReduceTasks(numReduceTasks);
-
-    setUpMapperReducerConfigs(job);
-
-    return job;
-  }
-
-  abstract Class<? extends Partitioner> getPartitioner();
-
-  abstract void setUpMapperReducerConfigs(Job job)
-      throws IOException;
-
-  abstract String getSampleTimeColumnValue(String timeColumnName)
-      throws IOException;
-
-  private void setValidationConfigs(Job job, Path path)
-      throws IOException {
-    SegmentsValidationAndRetentionConfig validationConfig = 
_tableConfig.getValidationConfig();
-
-    // TODO: Serialize and deserialize validation config by creating toJson 
and fromJson
-    // If the use case is an append use case, check that one time unit is 
contained in one file. If there is more than one,
-    // the job should be disabled, as we should not resize for these use 
cases. Therefore, setting the time column name
-    // and value
-    if 
(IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND"))
 {
-      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
-      String timeColumnName = validationConfig.getTimeColumnName();
-      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, 
timeColumnName);
-      if (timeColumnName != null) {
-        DateTimeFieldSpec dateTimeFieldSpec = 
_pinotTableSchema.getSpecForTimeColumn(timeColumnName);
-        if (dateTimeFieldSpec != null) {
-          DateTimeFormatSpec formatSpec = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, 
formatSpec.getColumnUnit().toString());
-          job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, 
formatSpec.getTimeFormat().toString());
-          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, 
formatSpec.getSDFPattern());
-        }
-      }
-      
job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
-          
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
-
-      String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
-      if (sampleTimeColumnValue != null) {
-        job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, 
sampleTimeColumnValue);
-      }
-    }
-  }
-
-  private void setHadoopJobConfigs(Job job) {
-    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
-    job.setJobName(getClass().getName());
-    FileOutputFormat.setOutputPath(job, _outputPath);
-    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
-    // Turn this on to always firstly use class paths that user specifies.
-    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, 
"true");
-    // Turn this off since we don't need an empty file in the output directory
-    
job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
 "false");
-
-    String hadoopTokenFileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    if (hadoopTokenFileLocation != null) {
-      job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, 
hadoopTokenFileLocation);
-    }
-  }
-}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..89c836c
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopAvroDataPreprocessingHelper.java
@@ -0,0 +1,49 @@
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroJob;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.AvroMultipleOutputs;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import 
org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper;
+import 
org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopAvroDataPreprocessingHelper extends 
HadoopDataPreprocessingHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopAvroDataPreprocessingHelper.class);
+
+  public HadoopAvroDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpMapperReducerConfigs(Job job)
+      throws IOException {
+    Schema avroSchema = (Schema) 
getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+    LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
+    validateConfigsAgainstSchema(avroSchema);
+
+    job.setInputFormatClass(AvroKeyInputFormat.class);
+    job.setMapperClass(AvroDataPreprocessingMapper.class);
+
+    job.setReducerClass(AvroDataPreprocessingReducer.class);
+    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, 
avroSchema);
+    AvroMultipleOutputs.setCountersEnabled(job, true);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
+    job.setOutputKeyClass(AvroKey.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    AvroJob.setInputKeySchema(job, avroSchema);
+    AvroJob.setMapOutputValueSchema(job, avroSchema);
+    AvroJob.setOutputKeySchema(job, avroSchema);
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
new file mode 100644
index 0000000..d71575f
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelper.java
@@ -0,0 +1,149 @@
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pinot.hadoop.job.HadoopSegmentPreprocessingJob;
+import org.apache.pinot.ingestion.preprocess.partitioners.GenericPartitioner;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
+import org.apache.pinot.ingestion.utils.preprocess.TextComparator;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class HadoopDataPreprocessingHelper implements 
HadoopJobPreparer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopDataPreprocessingHelper.class);
+
+  protected DataPreprocessingHelper _dataPreprocessingHelper;
+
+  public HadoopDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    _dataPreprocessingHelper = dataPreprocessingHelper;
+  }
+
+  public Job setUpJob()
+      throws IOException {
+    LOGGER.info("Initializing a pre-processing job");
+    Job job = Job.getInstance(HadoopUtils.DEFAULT_CONFIGURATION);
+    Configuration jobConf = job.getConfiguration();
+    // Input and output paths.
+    int numInputPaths = _dataPreprocessingHelper._inputDataPaths.size();
+    jobConf.setInt(JobContext.NUM_MAPS, numInputPaths);
+    _dataPreprocessingHelper.setValidationConfigs(job, 
_dataPreprocessingHelper._sampleRawDataPath);
+    for (Path inputFile : _dataPreprocessingHelper._inputDataPaths) {
+      FileInputFormat.addInputPath(job, inputFile);
+    }
+    setHadoopJobConfigs(job);
+
+    // Sorting column
+    if (_dataPreprocessingHelper._sortingColumn != null) {
+      LOGGER.info("Adding sorting column: {} to job config", 
_dataPreprocessingHelper._sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_CONFIG, 
_dataPreprocessingHelper._sortingColumn);
+      jobConf.set(InternalConfigConstants.SORTING_COLUMN_TYPE, 
_dataPreprocessingHelper._sortingColumnType.name());
+
+      switch (_dataPreprocessingHelper._sortingColumnType) {
+        case INT:
+          job.setMapOutputKeyClass(IntWritable.class);
+          break;
+        case LONG:
+          job.setMapOutputKeyClass(LongWritable.class);
+          break;
+        case FLOAT:
+          job.setMapOutputKeyClass(FloatWritable.class);
+          break;
+        case DOUBLE:
+          job.setMapOutputKeyClass(DoubleWritable.class);
+          break;
+        case STRING:
+          job.setMapOutputKeyClass(Text.class);
+          job.setSortComparatorClass(TextComparator.class);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      job.setMapOutputKeyClass(NullWritable.class);
+    }
+
+    // Partition column
+    int numReduceTasks = 0;
+    if (_dataPreprocessingHelper._partitionColumn != null) {
+      numReduceTasks = _dataPreprocessingHelper._numPartitions;
+      jobConf.set(InternalConfigConstants.ENABLE_PARTITIONING, "true");
+      job.setPartitionerClass(GenericPartitioner.class);
+      jobConf.set(InternalConfigConstants.PARTITION_COLUMN_CONFIG, 
_dataPreprocessingHelper._partitionColumn);
+      if (_dataPreprocessingHelper._partitionFunction != null) {
+        jobConf.set(InternalConfigConstants.PARTITION_FUNCTION_CONFIG, 
_dataPreprocessingHelper._partitionFunction);
+      }
+      jobConf.setInt(InternalConfigConstants.NUM_PARTITIONS_CONFIG, 
numReduceTasks);
+      job.setPartitionerClass(_dataPreprocessingHelper.getPartitioner());
+    } else {
+      if (_dataPreprocessingHelper._numOutputFiles > 0) {
+        numReduceTasks = _dataPreprocessingHelper._numOutputFiles;
+      } else {
+        // default number of input paths
+        numReduceTasks = _dataPreprocessingHelper._inputDataPaths.size();
+      }
+    }
+    // Maximum number of records per output file
+    jobConf.set(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE,
+        Integer.toString(_dataPreprocessingHelper._maxNumRecordsPerFile));
+    // Number of reducers
+    LOGGER.info("Number of reduce tasks for pre-processing job: {}", 
numReduceTasks);
+    job.setNumReduceTasks(numReduceTasks);
+
+    setUpMapperReducerConfigs(job);
+
+    return job;
+  }
+
+  private void setHadoopJobConfigs(Job job) {
+    job.setJarByClass(HadoopSegmentPreprocessingJob.class);
+    job.setJobName(getClass().getName());
+    FileOutputFormat.setOutputPath(job, _dataPreprocessingHelper._outputPath);
+    job.getConfiguration().set(JobContext.JOB_NAME, this.getClass().getName());
+    // Turn this on to always firstly use class paths that user specifies.
+    job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, 
"true");
+    // Turn this off since we don't need an empty file in the output directory
+    
job.getConfiguration().set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
 "false");
+
+    String hadoopTokenFileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    if (hadoopTokenFileLocation != null) {
+      job.getConfiguration().set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, 
hadoopTokenFileLocation);
+    }
+  }
+
+  public Object getSchema(Path inputPathDir)
+      throws IOException {
+    return _dataPreprocessingHelper.getSchema(inputPathDir);
+  }
+
+  public void validateConfigsAgainstSchema(Object schema) {
+    _dataPreprocessingHelper.validateConfigsAgainstSchema(schema);
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
+      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _dataPreprocessingHelper
+        .registerConfigs(tableConfig, tableSchema, partitionColumn, 
numPartitions, partitionFunction, sortingColumn,
+            sortingColumnType, numOutputFiles, maxNumRecordsPerFile);
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
new file mode 100644
index 0000000..591fd46
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopDataPreprocessingHelperFactory.java
@@ -0,0 +1,39 @@
+package org.apache.pinot.hadoop.job.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopDataPreprocessingHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopDataPreprocessingHelperFactory.class);
+
+  public static HadoopDataPreprocessingHelper 
generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+      throws IOException {
+    final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.AVRO_FILE_EXTENSION);
+    final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.ORC_FILE_EXTENSION);
+
+    int numAvroFiles = avroFiles.size();
+    int numOrcFiles = orcFiles.size();
+    Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+        "Cannot preprocess mixed AVRO files: %s and ORC files: %s in 
directories: %s", avroFiles, orcFiles,
+        inputPaths);
+    Preconditions
+        .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any 
AVRO or ORC file in directories: %s",
+            inputPaths);
+
+    if (numAvroFiles > 0) {
+      LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, 
inputPaths);
+      return new HadoopAvroDataPreprocessingHelper(new 
AvroDataPreprocessingHelper(avroFiles, outputPath));
+    } else {
+      LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, 
inputPaths);
+      return new HadoopOrcDataPreprocessingHelper(new 
OrcDataPreprocessingHelper(orcFiles, outputPath));
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
new file mode 100644
index 0000000..ea2faf6
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopJobPreparer.java
@@ -0,0 +1,10 @@
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.Job;
+
+
+public interface HadoopJobPreparer {
+
+  void setUpMapperReducerConfigs(Job job) throws IOException;
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..11ade11
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/HadoopOrcDataPreprocessingHelper.java
@@ -0,0 +1,48 @@
+package org.apache.pinot.hadoop.job.preprocess;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.orc.OrcConf;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.apache.orc.mapreduce.OrcInputFormat;
+import org.apache.orc.mapreduce.OrcOutputFormat;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import 
org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper;
+import 
org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopOrcDataPreprocessingHelper extends 
HadoopDataPreprocessingHelper {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HadoopOrcDataPreprocessingHelper.class);
+
+  public HadoopOrcDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpMapperReducerConfigs(Job job)
+      throws IOException {
+    Object orcSchema = getSchema(_dataPreprocessingHelper._sampleRawDataPath);
+    String orcSchemaString = orcSchema.toString();
+    LOGGER.info("Orc schema is: {}", orcSchemaString);
+    validateConfigsAgainstSchema(orcSchema);
+
+    job.setInputFormatClass(OrcInputFormat.class);
+    job.setMapperClass(OrcDataPreprocessingMapper.class);
+    job.setMapOutputValueClass(OrcValue.class);
+    Configuration jobConf = job.getConfiguration();
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+
+    job.setReducerClass(OrcDataPreprocessingReducer.class);
+    // Use LazyOutputFormat to avoid creating empty files.
+    LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(OrcStruct.class);
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
index 284b698..3df7bb8 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/pom.xml
@@ -117,5 +117,13 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
index 2e3b023..fe51e00 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
@@ -21,11 +21,25 @@ package org.apache.pinot.ingestion.jobs;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pinot.ingestion.common.ControllerRestApi;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +52,7 @@ import org.slf4j.LoggerFactory;
  * * enable.preprocessing: false by default. Enables preprocessing job.
  */
 public abstract class SegmentPreprocessingJob extends BaseSegmentJob {
-  private static final Logger _logger = 
LoggerFactory.getLogger(SegmentPreprocessingJob.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentPreprocessingJob.class);
   protected final Path _schemaFile;
   protected final Path _inputSegmentDir;
   protected final Path _preprocessedOutputDir;
@@ -46,6 +60,21 @@ public abstract class SegmentPreprocessingJob extends 
BaseSegmentJob {
   protected final Path _pathToDependencyJar;
   protected boolean _enablePreprocessing;
 
+  protected String _partitionColumn;
+  protected int _numPartitions;
+  protected String _partitionFunction;
+
+  protected String _sortingColumn;
+  protected FieldSpec.DataType _sortingColumnType;
+
+  protected int _numOutputFiles;
+  protected int _maxNumRecordsPerFile;
+
+  protected TableConfig _tableConfig;
+  protected org.apache.pinot.spi.data.Schema _pinotTableSchema;
+
+  protected Set<DataPreprocessingUtils.Operation> _preprocessingOperations;
+
   public SegmentPreprocessingJob(final Properties properties) {
     super(properties);
 
@@ -59,13 +88,13 @@ public abstract class SegmentPreprocessingJob extends 
BaseSegmentJob {
     _pathToDependencyJar = 
getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
     _schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
 
-    
_logger.info("*********************************************************************");
-    _logger.info("enable.preprocessing: {}", _enablePreprocessing);
-    _logger.info("path.to.input: {}", _inputSegmentDir);
-    _logger.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
-    _logger.info("path.to.deps.jar: {}", _pathToDependencyJar);
-    _logger.info("push.locations: {}", _pushLocations);
-    
_logger.info("*********************************************************************");
+    
LOGGER.info("*********************************************************************");
+    LOGGER.info("enable.preprocessing: {}", _enablePreprocessing);
+    LOGGER.info("path.to.input: {}", _inputSegmentDir);
+    LOGGER.info("preprocess.path.to.output: {}", _preprocessedOutputDir);
+    LOGGER.info("path.to.deps.jar: {}", _pathToDependencyJar);
+    LOGGER.info("push.locations: {}", _pushLocations);
+    
LOGGER.info("*********************************************************************");
   }
 
   protected abstract void run()
@@ -90,4 +119,131 @@ public abstract class SegmentPreprocessingJob extends 
BaseSegmentJob {
     // TODO: support orc format in the future.
     return fileName.endsWith(".avro");
   }
+
+  protected void setTableConfigAndSchema()
+      throws IOException {
+    _tableConfig = getTableConfig();
+    _pinotTableSchema = getSchema();
+
+    Preconditions.checkState(_tableConfig != null, "Table config cannot be 
null.");
+    Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be 
null");
+  }
+
+  protected void fetchPreProcessingOperations() {
+    _preprocessingOperations = new HashSet<>();
+    TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+    if (customConfig != null) {
+      Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+      if (customConfigMap != null && !customConfigMap.isEmpty()) {
+        String preprocessingOperationsString =
+            
customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+        DataPreprocessingUtils.getOperations(_preprocessingOperations, 
preprocessingOperationsString);
+      }
+    }
+  }
+
+  protected void fetchPartitioningConfig() {
+    // Fetch partition info from table config.
+    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION))
 {
+      LOGGER.info("Partitioning is disabled.");
+      return;
+    }
+    SegmentPartitionConfig segmentPartitionConfig = 
_tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap = 
segmentPartitionConfig.getColumnPartitionMap();
+      Preconditions
+          .checkArgument(columnPartitionMap.size() <= 1, "There should be at 
most 1 partition setting in the table.");
+      if (columnPartitionMap.size() == 1) {
+        _partitionColumn = columnPartitionMap.keySet().iterator().next();
+        _numPartitions = 
segmentPartitionConfig.getNumPartitions(_partitionColumn);
+        _partitionFunction = 
segmentPartitionConfig.getFunctionName(_partitionColumn);
+      }
+    } else {
+      LOGGER.info("Segment partition config is null for table: {}", 
_tableConfig.getTableName());
+    }
+  }
+
+  protected void fetchSortingConfig() {
+    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
+      LOGGER.info("Sorting is disabled.");
+      return;
+    }
+    // Fetch sorting info from table config first.
+    List<String> sortingColumns = new ArrayList<>();
+    List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
+    if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
+      for (FieldConfig fieldConfig : fieldConfigs) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
+          sortingColumns.add(fieldConfig.getName());
+        }
+      }
+    }
+    if (!sortingColumns.isEmpty()) {
+      Preconditions.checkArgument(sortingColumns.size() == 1, "There should be 
at most 1 sorted column in the table.");
+      _sortingColumn = sortingColumns.get(0);
+      return;
+    }
+
+    // There is no sorted column specified in field configs, try to find 
sorted column from indexing config.
+    IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+    List<String> sortedColumns = indexingConfig.getSortedColumn();
+    if (sortedColumns != null) {
+      Preconditions.checkArgument(sortedColumns.size() <= 1, "There should be 
at most 1 sorted column in the table.");
+      if (sortedColumns.size() == 1) {
+        _sortingColumn = sortedColumns.get(0);
+        FieldSpec fieldSpec = 
_pinotTableSchema.getFieldSpecFor(_sortingColumn);
+        Preconditions.checkState(fieldSpec != null, "Failed to find sorting 
column: {} in the schema", _sortingColumn);
+        Preconditions
+            .checkState(fieldSpec.isSingleValueField(), "Cannot sort on 
multi-value column: %s", _sortingColumn);
+        _sortingColumnType = fieldSpec.getDataType();
+        Preconditions
+            .checkState(_sortingColumnType.canBeASortedColumn(), "Cannot sort 
on %s column: %s", _sortingColumnType,
+                _sortingColumn);
+        LOGGER.info("Sorting the data with column: {} of type: {}", 
_sortingColumn, _sortingColumnType);
+      }
+    }
+  }
+
+  protected void fetchResizingConfig() {
+    if 
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.RESIZE)) {
+      LOGGER.info("Resizing is disabled.");
+      return;
+    }
+    TableCustomConfig tableCustomConfig = _tableConfig.getCustomConfig();
+    if (tableCustomConfig == null) {
+      _numOutputFiles = 0;
+      return;
+    }
+    Map<String, String> customConfigsMap = 
tableCustomConfig.getCustomConfigs();
+    if (customConfigsMap != null && 
customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS))
 {
+      _numOutputFiles = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_NUM_REDUCERS));
+      Preconditions.checkState(_numOutputFiles > 0, String
+          .format("The value of %s should be positive! Current value: %s",
+              InternalConfigConstants.PREPROCESSING_NUM_REDUCERS, 
_numOutputFiles));
+    } else {
+      _numOutputFiles = 0;
+    }
+
+    if (customConfigsMap != null) {
+      int maxNumRecords;
+      if 
(customConfigsMap.containsKey(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE))
 {
+        LOGGER.warn("The config: {} from custom config is deprecated. Use {} 
instead.",
+            InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE,
+            InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE);
+        maxNumRecords = 
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PARTITION_MAX_RECORDS_PER_FILE));
+      } else if 
(customConfigsMap.containsKey(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE))
 {
+        maxNumRecords =
+            
Integer.parseInt(customConfigsMap.get(InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE));
+      } else {
+        return;
+      }
+      // TODO: add a in-built maximum value for this config to avoid having 
too many small files.
+      // E.g. if the config is set to 1 which is smaller than this in-built 
value, the job should be abort from generating too many small files.
+      Preconditions.checkArgument(maxNumRecords > 0,
+          "The value of " + 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE
+              + " should be positive. Current value: " + maxNumRecords);
+      LOGGER.info("Setting {} to {}", 
InternalConfigConstants.PREPROCESSING_MAX_NUM_RECORDS_PER_FILE, maxNumRecords);
+      _maxNumRecordsPerFile = maxNumRecords;
+    }
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
similarity index 79%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
index 9e5f5f2..260c153 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/AvroDataPreprocessingHelper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/AvroDataPreprocessingHelper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -39,10 +39,10 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.AvroDataPreprocessingMapper;
-import 
org.apache.pinot.hadoop.job.partitioners.AvroDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.AvroDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import 
org.apache.pinot.ingestion.preprocess.mappers.AvroDataPreprocessingMapper;
+import 
org.apache.pinot.ingestion.preprocess.partitioners.AvroDataPreprocessingPartitioner;
+import 
org.apache.pinot.ingestion.preprocess.reducers.AvroDataPreprocessingReducer;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,30 +61,19 @@ public class AvroDataPreprocessingHelper extends 
DataPreprocessingHelper {
   }
 
   @Override
-  public void setUpMapperReducerConfigs(Job job)
+  public Object getSchema(Path inputPathDir)
       throws IOException {
-    Schema avroSchema = getAvroSchema(_sampleRawDataPath);
-    LOGGER.info("Avro schema is: {}", avroSchema.toString(true));
-    validateConfigsAgainstSchema(avroSchema);
-
-    job.setInputFormatClass(AvroKeyInputFormat.class);
-    job.setMapperClass(AvroDataPreprocessingMapper.class);
-
-    job.setReducerClass(AvroDataPreprocessingReducer.class);
-    AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat.class, 
avroSchema);
-    AvroMultipleOutputs.setCountersEnabled(job, true);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
-    job.setOutputKeyClass(AvroKey.class);
-    job.setOutputValueClass(NullWritable.class);
+    return getAvroSchema(inputPathDir);
+  }
 
-    AvroJob.setInputKeySchema(job, avroSchema);
-    AvroJob.setMapOutputValueSchema(job, avroSchema);
-    AvroJob.setOutputKeySchema(job, avroSchema);
+  @Override
+  public void validateConfigsAgainstSchema(Object schema) {
+    Schema avroSchema = (Schema) schema;
+    validateConfigsAgainstSchema(avroSchema);
   }
 
   @Override
-  String getSampleTimeColumnValue(String timeColumnName)
+  public String getSampleTimeColumnValue(String timeColumnName)
       throws IOException {
     String sampleTimeColumnValue;
     try (DataFileStream<GenericRecord> dataStreamReader = 
getAvroReader(_sampleRawDataPath)) {
@@ -99,7 +88,7 @@ public class AvroDataPreprocessingHelper extends 
DataPreprocessingHelper {
    * @return Input schema
    * @throws IOException exception when accessing to IO
    */
-  private Schema getAvroSchema(Path inputPathDir)
+  protected Schema getAvroSchema(Path inputPathDir)
       throws IOException {
     Schema avroSchema = null;
     for (FileStatus fileStatus : 
HadoopUtils.DEFAULT_FILE_SYSTEM.listStatus(inputPathDir)) {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
new file mode 100644
index 0000000..cb0a738
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelper.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.ingestion.preprocess;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class DataPreprocessingHelper implements 
SampleTimeColumnExtractable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataPreprocessingHelper.class);
+
+  public String _partitionColumn;
+  public int _numPartitions;
+  public String _partitionFunction;
+
+  public String _sortingColumn;
+  public FieldSpec.DataType _sortingColumnType;
+
+  public int _numOutputFiles;
+  public int _maxNumRecordsPerFile;
+
+  public TableConfig _tableConfig;
+  public Schema _pinotTableSchema;
+
+  public List<Path> _inputDataPaths;
+  public Path _sampleRawDataPath;
+  public Path _outputPath;
+
+  public DataPreprocessingHelper(List<Path> inputDataPaths, Path outputPath) {
+    _inputDataPaths = inputDataPaths;
+    _sampleRawDataPath = inputDataPaths.get(0);
+    _outputPath = outputPath;
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
+      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _tableConfig = tableConfig;
+    _pinotTableSchema = tableSchema;
+    _partitionColumn = partitionColumn;
+    _numPartitions = numPartitions;
+    _partitionFunction = partitionFunction;
+
+    _sortingColumn = sortingColumn;
+    _sortingColumnType = sortingColumnType;
+
+    _numOutputFiles = numOutputFiles;
+    _maxNumRecordsPerFile = maxNumRecordsPerFile;
+  }
+
+  public abstract Class<? extends Partitioner> getPartitioner();
+
+  abstract public Object getSchema(Path inputPathDir)
+      throws IOException;
+
+  abstract public void validateConfigsAgainstSchema(Object schema);
+
+  public void setValidationConfigs(Job job, Path path)
+      throws IOException {
+    SegmentsValidationAndRetentionConfig validationConfig = 
_tableConfig.getValidationConfig();
+
+    // TODO: Serialize and deserialize validation config by creating toJson 
and fromJson
+    // If the use case is an append use case, check that one time unit is 
contained in one file. If there is more than one,
+    // the job should be disabled, as we should not resize for these use 
cases. Therefore, setting the time column name
+    // and value
+    if 
(IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig).equalsIgnoreCase("APPEND"))
 {
+      job.getConfiguration().set(InternalConfigConstants.IS_APPEND, "true");
+      String timeColumnName = validationConfig.getTimeColumnName();
+      job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, 
timeColumnName);
+      if (timeColumnName != null) {
+        DateTimeFieldSpec dateTimeFieldSpec = 
_pinotTableSchema.getSpecForTimeColumn(timeColumnName);
+        if (dateTimeFieldSpec != null) {
+          DateTimeFormatSpec formatSpec = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_TYPE, 
formatSpec.getColumnUnit().toString());
+          job.getConfiguration()
+              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, 
formatSpec.getTimeFormat().toString());
+          
job.getConfiguration().set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, 
formatSpec.getSDFPattern());
+        }
+      }
+      
job.getConfiguration().set(InternalConfigConstants.SEGMENT_PUSH_FREQUENCY,
+          
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
+
+      String sampleTimeColumnValue = getSampleTimeColumnValue(timeColumnName);
+      if (sampleTimeColumnValue != null) {
+        job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_VALUE, 
sampleTimeColumnValue);
+      }
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
similarity index 95%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
index 2e91773..1cb07e3 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperFactory.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/DataPreprocessingHelperFactory.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.fs.Path;
-import org.apache.pinot.hadoop.utils.preprocess.DataFileUtils;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
similarity index 86%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
index aec0bb0..6168426 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/preprocess/OrcDataPreprocessingHelper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/OrcDataPreprocessingHelper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.preprocess;
+package org.apache.pinot.ingestion.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -43,10 +43,10 @@ import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
 import org.apache.orc.mapreduce.OrcInputFormat;
 import org.apache.orc.mapreduce.OrcOutputFormat;
-import org.apache.pinot.hadoop.job.mappers.OrcDataPreprocessingMapper;
-import 
org.apache.pinot.hadoop.job.partitioners.OrcDataPreprocessingPartitioner;
-import org.apache.pinot.hadoop.job.reducers.OrcDataPreprocessingReducer;
-import org.apache.pinot.hadoop.utils.preprocess.HadoopUtils;
+import 
org.apache.pinot.ingestion.preprocess.mappers.OrcDataPreprocessingMapper;
+import 
org.apache.pinot.ingestion.preprocess.partitioners.OrcDataPreprocessingPartitioner;
+import 
org.apache.pinot.ingestion.preprocess.reducers.OrcDataPreprocessingReducer;
+import org.apache.pinot.ingestion.utils.preprocess.HadoopUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.utils.StringUtils;
 import org.slf4j.Logger;
@@ -61,33 +61,24 @@ public class OrcDataPreprocessingHelper extends 
DataPreprocessingHelper {
   }
 
   @Override
-  Class<? extends Partitioner> getPartitioner() {
+  public Class<? extends Partitioner> getPartitioner() {
     return OrcDataPreprocessingPartitioner.class;
   }
 
   @Override
-  void setUpMapperReducerConfigs(Job job) {
-    TypeDescription orcSchema = getOrcSchema(_sampleRawDataPath);
-    String orcSchemaString = orcSchema.toString();
-    LOGGER.info("Orc schema is: {}", orcSchemaString);
-    validateConfigsAgainstSchema(orcSchema);
-
-    job.setInputFormatClass(OrcInputFormat.class);
-    job.setMapperClass(OrcDataPreprocessingMapper.class);
-    job.setMapOutputValueClass(OrcValue.class);
-    Configuration jobConf = job.getConfiguration();
-    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(jobConf, orcSchemaString);
+  public Object getSchema(Path inputPathDir)
+      throws IOException {
+    return getOrcSchema(inputPathDir);
+  }
 
-    job.setReducerClass(OrcDataPreprocessingReducer.class);
-    // Use LazyOutputFormat to avoid creating empty files.
-    LazyOutputFormat.setOutputFormatClass(job, OrcOutputFormat.class);
-    job.setOutputKeyClass(NullWritable.class);
-    job.setOutputValueClass(OrcStruct.class);
-    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(jobConf, orcSchemaString);
+  @Override
+  public void validateConfigsAgainstSchema(Object schema) {
+    TypeDescription orcSchema = (TypeDescription) schema;
+    validateConfigsAgainstSchema(orcSchema);
   }
 
   @Override
-  String getSampleTimeColumnValue(String timeColumnName)
+  public String getSampleTimeColumnValue(String timeColumnName)
       throws IOException {
     try (Reader reader = OrcFile
         .createReader(_sampleRawDataPath, 
OrcFile.readerOptions(HadoopUtils.DEFAULT_CONFIGURATION))) {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
new file mode 100644
index 0000000..5245330
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/SampleTimeColumnExtractable.java
@@ -0,0 +1,9 @@
+package org.apache.pinot.ingestion.preprocess;
+
+import java.io.IOException;
+
+
+public interface SampleTimeColumnExtractable {
+
+  String getSampleTimeColumnValue(String timeColumnName) throws IOException;
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
similarity index 95%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
index 6278e8e..31229f0 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/AvroDataPreprocessingMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/AvroDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.slf4j.Logger;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
similarity index 93%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
index d7d0694..19beaa6 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/OrcDataPreprocessingMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/OrcDataPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -27,9 +27,9 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.DataPreprocessingUtils;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.DataPreprocessingUtils;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
index 3d3fcec..4d8e2f2 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/mappers/SegmentPreprocessingMapper.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.mappers;
+package org.apache.pinot.ingestion.preprocess.mappers;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
@@ -30,7 +30,7 @@ import org.apache.avro.mapreduce.AvroJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import 
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
index 74799c7..d833964 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/AvroDataPreprocessingPartitioner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/AvroDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericRecord;
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
similarity index 88%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
index 7ca22cf..f9c1467 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/GenericPartitioner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/GenericPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroValue;
@@ -27,9 +27,9 @@ import 
org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static 
org.apache.pinot.hadoop.job.InternalConfigConstants.NUM_PARTITIONS_CONFIG;
-import static 
org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_COLUMN_CONFIG;
-import static 
org.apache.pinot.hadoop.job.InternalConfigConstants.PARTITION_FUNCTION_CONFIG;
+import static 
org.apache.pinot.ingestion.utils.InternalConfigConstants.NUM_PARTITIONS_CONFIG;
+import static 
org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_COLUMN_CONFIG;
+import static 
org.apache.pinot.ingestion.utils.InternalConfigConstants.PARTITION_FUNCTION_CONFIG;
 
 
 public class GenericPartitioner<T> extends Partitioner<T, 
AvroValue<GenericRecord>> implements Configurable {
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
similarity index 95%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
index bef2cef..d9b509d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/OrcDataPreprocessingPartitioner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/OrcDataPreprocessingPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import com.google.common.base.Preconditions;
 import java.util.List;
@@ -26,8 +26,8 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
-import org.apache.pinot.hadoop.utils.preprocess.OrcUtils;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.preprocess.OrcUtils;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
index 0ba57db..1826bdc 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/partitioners/PartitionFunctionFactory.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/partitioners/PartitionFunctionFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.partitioners;
+package org.apache.pinot.ingestion.preprocess.partitioners;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
index 5fcbf10..4f236a4 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/AvroDataPreprocessingReducer.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/AvroDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
 
 import java.io.IOException;
 import org.apache.avro.generic.GenericRecord;
@@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
index a3387a2..10d3f10 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/reducers/OrcDataPreprocessingReducer.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/preprocess/reducers/OrcDataPreprocessingReducer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job.reducers;
+package org.apache.pinot.ingestion.preprocess.reducers;
 
 import java.io.IOException;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.orc.mapred.OrcStruct;
 import org.apache.orc.mapred.OrcValue;
-import org.apache.pinot.hadoop.job.InternalConfigConstants;
+import org.apache.pinot.ingestion.utils.InternalConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
index 4cc6f75..8450179 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataPreprocessingUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/DataPreprocessingUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils;
 
 import java.util.Set;
 import org.apache.hadoop.io.DoubleWritable;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
index 3701db2..e85a3e8 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/InternalConfigConstants.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.job;
+package org.apache.pinot.ingestion.utils;
 
 /**
  * Internal-only constants for Hadoop MapReduce jobs. These constants are 
propagated across different segment creation
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
similarity index 97%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
index 58e1c1d..a6e4ca7 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/DataFileUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/DataFileUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
similarity index 96%
copy from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
copy to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
index 0596259..e4cdf5e 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/HadoopUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
similarity index 98%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
index dcfc3b5..09eb218 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/OrcUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/OrcUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.io.BooleanWritable;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
index 65f1222..05a437d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/TextComparator.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/utils/preprocess/TextComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.ingestion.utils.preprocess;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
new file mode 100644
index 0000000..b4b72a3
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentPreprocessingJob.java
@@ -0,0 +1,89 @@
+package org.apache.pinot.spark.jobs;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.jobs.SegmentPreprocessingJob;
+import org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelper;
+import 
org.apache.pinot.spark.jobs.preprocess.SparkDataPreprocessingHelperFactory;
+import org.apache.pinot.spark.utils.HadoopUtils;
+import org.apache.pinot.spark.utils.PinotSparkJobPreparationHelper;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Spark job which provides partitioning, sorting, and resizing against the 
input files, which is raw data in either Avro or Orc format.
+ * Thus, the output files are partitioned, sorted, resized after this job.
+ * In order to run this job, the following configs need to be specified in job 
properties:
+ * * enable.preprocessing: false by default. Enables preprocessing job.
+ */
+public class SparkSegmentPreprocessingJob extends SegmentPreprocessingJob {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkSegmentPreprocessingJob.class);
+
+  public SparkSegmentPreprocessingJob(Properties properties) {
+    super(properties);
+  }
+
+  @Override
+  protected void run()
+      throws Exception {
+    if (!_enablePreprocessing) {
+      LOGGER.info("Pre-processing job is disabled.");
+      return;
+    } else {
+      LOGGER.info("Starting {}", getClass().getSimpleName());
+    }
+
+    setTableConfigAndSchema();
+    fetchPreProcessingOperations();
+    fetchPartitioningConfig();
+    fetchSortingConfig();
+    fetchResizingConfig();
+
+    // Cleans up preprocessed output dir if exists
+    cleanUpPreprocessedOutputs(_preprocessedOutputDir);
+
+    SparkDataPreprocessingHelper dataPreprocessingHelper =
+        
SparkDataPreprocessingHelperFactory.generateDataPreprocessingHelper(_inputSegmentDir,
 _preprocessedOutputDir);
+    dataPreprocessingHelper
+        .registerConfigs(_tableConfig, _pinotTableSchema, _partitionColumn, 
_numPartitions, _partitionFunction,
+            _sortingColumn, _sortingColumnType, _numOutputFiles, 
_maxNumRecordsPerFile);
+
+    // Set up and execute spark job.
+    JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+    addDepsJarToDistributedCache(javaSparkContext);
+
+    SparkSession sparkSession = 
SparkSession.builder().appName(SparkSegmentPreprocessingJob.class.getSimpleName()).getOrCreate();
+
+    dataPreprocessingHelper.setUpAndExecuteJob(sparkSession);
+  }
+
+  public void setUpAndExecuteJob(JavaSparkContext sparkContext) {
+  }
+
+  /**
+   * Cleans up outputs in preprocessed output directory.
+   */
+  public static void cleanUpPreprocessedOutputs(Path preprocessedOutputDir)
+      throws IOException {
+    if (HadoopUtils.DEFAULT_FILE_SYSTEM.exists(preprocessedOutputDir)) {
+      LOGGER.warn("Found output folder {}, deleting", preprocessedOutputDir);
+      HadoopUtils.DEFAULT_FILE_SYSTEM.delete(preprocessedOutputDir, true);
+    }
+  }
+
+  protected void addDepsJarToDistributedCache(JavaSparkContext sparkContext)
+      throws IOException {
+    if (_pathToDependencyJar != null) {
+      PinotSparkJobPreparationHelper
+          .addDepsJarToDistributedCacheHelper(HadoopUtils.DEFAULT_FILE_SYSTEM, 
sparkContext,
+              _pathToDependencyJar);
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
new file mode 100644
index 0000000..3374f3c
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkAvroDataPreprocessingHelper.java
@@ -0,0 +1,39 @@
+package org.apache.pinot.spark.jobs.preprocess;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import scala.collection.JavaConverters;
+
+
+public class SparkAvroDataPreprocessingHelper extends 
SparkDataPreprocessingHelper {
+  public SparkAvroDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpAndExecuteJob(SparkSession sparkSession) {
+
+    
JavaConverters.asJavaCollectionConverter(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths))
+    
sparkSession.read().format("avro").load(convertPathsToStrings(_dataPreprocessingHelper._inputDataPaths));
+
+  }
+
+  @Override
+  public void setUpAndExecuteJob(JavaSparkContext sparkContext) {
+    JavaRDD<String> inputRDD = 
sparkContext.textFile(_dataPreprocessingHelper._inputDataPaths.toString(), 
_dataPreprocessingHelper._numOutputFiles);
+  }
+
+  private Iterable<String> convertPathsToStrings(List<Path> paths) {
+    List<String> stringList = new ArrayList<>();
+    for (Path path : paths) {
+      stringList.add(path.toString());
+    }
+    stringList.it
+    return stringList.iterator();
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
new file mode 100644
index 0000000..54520df
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelper.java
@@ -0,0 +1,27 @@
+package org.apache.pinot.spark.jobs.preprocess;
+
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.spark.sql.SparkSession;
+
+
+public abstract class SparkDataPreprocessingHelper {
+
+  protected DataPreprocessingHelper _dataPreprocessingHelper;
+
+  public SparkDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    _dataPreprocessingHelper = dataPreprocessingHelper;
+  }
+
+  public void registerConfigs(TableConfig tableConfig, Schema tableSchema, 
String partitionColumn, int numPartitions,
+      String partitionFunction, String sortingColumn, FieldSpec.DataType 
sortingColumnType, int numOutputFiles,
+      int maxNumRecordsPerFile) {
+    _dataPreprocessingHelper
+        .registerConfigs(tableConfig, tableSchema, partitionColumn, 
numPartitions, partitionFunction, sortingColumn,
+            sortingColumnType, numOutputFiles, maxNumRecordsPerFile);
+  }
+
+  public abstract void setUpAndExecuteJob(SparkSession sparkSession);
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
new file mode 100644
index 0000000..8f6a92e
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkDataPreprocessingHelperFactory.java
@@ -0,0 +1,39 @@
+package org.apache.pinot.spark.jobs.preprocess;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.ingestion.preprocess.AvroDataPreprocessingHelper;
+import org.apache.pinot.ingestion.preprocess.OrcDataPreprocessingHelper;
+import org.apache.pinot.ingestion.utils.preprocess.DataFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SparkDataPreprocessingHelperFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkDataPreprocessingHelperFactory.class);
+
+  public static SparkDataPreprocessingHelper 
generateDataPreprocessingHelper(Path inputPaths, Path outputPath)
+      throws IOException {
+    final List<Path> avroFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.AVRO_FILE_EXTENSION);
+    final List<Path> orcFiles = DataFileUtils.getDataFiles(inputPaths, 
DataFileUtils.ORC_FILE_EXTENSION);
+
+    int numAvroFiles = avroFiles.size();
+    int numOrcFiles = orcFiles.size();
+    Preconditions.checkState(numAvroFiles == 0 || numOrcFiles == 0,
+        "Cannot preprocess mixed AVRO files: %s and ORC files: %s in 
directories: %s", avroFiles, orcFiles,
+        inputPaths);
+    Preconditions
+        .checkState(numAvroFiles > 0 || numOrcFiles > 0, "Failed to find any 
AVRO or ORC file in directories: %s",
+            inputPaths);
+
+    if (numAvroFiles > 0) {
+      LOGGER.info("Found AVRO files: {} in directories: {}", avroFiles, 
inputPaths);
+      return new SparkAvroDataPreprocessingHelper(new 
AvroDataPreprocessingHelper(avroFiles, outputPath));
+    } else {
+      LOGGER.info("Found ORC files: {} in directories: {}", orcFiles, 
inputPaths);
+      return new SparkOrcDataPreprocessingHelper(new 
OrcDataPreprocessingHelper(orcFiles, outputPath));
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
new file mode 100644
index 0000000..f8340ed
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/preprocess/SparkOrcDataPreprocessingHelper.java
@@ -0,0 +1,16 @@
+package org.apache.pinot.spark.jobs.preprocess;
+
+import org.apache.pinot.ingestion.preprocess.DataPreprocessingHelper;
+import org.apache.spark.sql.SparkSession;
+
+
+public class SparkOrcDataPreprocessingHelper extends 
SparkDataPreprocessingHelper {
+  public SparkOrcDataPreprocessingHelper(DataPreprocessingHelper 
dataPreprocessingHelper) {
+    super(dataPreprocessingHelper);
+  }
+
+  @Override
+  public void setUpAndExecuteJob(SparkSession sparkSession) {
+
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
similarity index 96%
rename from 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
rename to 
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
index 0596259..8ee89cf 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/utils/preprocess/HadoopUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/utils/HadoopUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.hadoop.utils.preprocess;
+package org.apache.pinot.spark.utils;
 
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to