This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bfddb7a  Pull the source fields Set out of the RecordReader/Decoder 
(#5303)
bfddb7a is described below

commit bfddb7a34dc60c80c059facaf0df3f7247fe81f9
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Apr 27 11:48:53 2020 -0700

    Pull the source fields Set out of the RecordReader/Decoder (#5303)
    
    Adding the source field names HashSet to the init of RecordReader and 
Decoder, thus removing it from inside the Reader/Decoder.
---
 .../realtime/HLRealtimeSegmentDataManager.java     |   5 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |   5 +-
 .../core/data/readers/GenericRowRecordReader.java  |   3 +-
 .../readers/MultiplePinotSegmentRecordReader.java  |   2 +-
 .../data/readers/PinotSegmentRecordReader.java     |   3 +-
 .../pinot/core/minion/BackfillDateTimeColumn.java  | 199 ----------------
 .../apache/pinot/core/minion/SegmentPurger.java    |   4 +-
 .../core/minion/segment/MapperRecordReader.java    |   3 +-
 .../core/minion/segment/ReducerRecordReader.java   |   3 +-
 .../converter/RealtimeSegmentRecordReader.java     |   3 +-
 .../impl/SegmentIndexCreationDriverImpl.java       |   7 +-
 .../readers/BackfillDateTimeRecordReaderTest.java  | 256 ---------------------
 .../data/readers/RecordReaderSampleDataTest.java   |  35 +--
 .../MutableSegmentImplNullValueVectorTest.java     |   3 +-
 .../mutable/MutableSegmentImplTest.java            |   3 +-
 .../impl/fakestream/FakeStreamConsumerFactory.java |   7 +-
 .../impl/fakestream/FakeStreamMessageDecoder.java  |   3 +-
 .../pinot/integration/tests/ClusterTest.java       |   4 +-
 ...lakyConsumerRealtimeClusterIntegrationTest.java |  11 +-
 .../batch/common/SegmentGenerationTaskRunner.java  |  11 +-
 .../plugin/inputformat/avro/AvroRecordReader.java  |   4 +-
 .../avro/AvroRecordExtractorMapTypeTest.java       |   9 +-
 .../inputformat/avro/AvroRecordExtractorTest.java  |   2 +-
 .../inputformat/avro/AvroRecordReaderTest.java     |   2 +-
 .../avro/AvroRecordToPinotRowGeneratorTest.java    |   2 +-
 .../inputformat/avro/KafkaAvroMessageDecoder.java  |   8 +-
 .../inputformat/avro/SimpleAvroMessageDecoder.java |   8 +-
 ...aConfluentSchemaRegistryAvroMessageDecoder.java |   6 +-
 .../plugin/inputformat/csv/CSVRecordReader.java    |   4 +-
 .../inputformat/csv/CSVRecordExtractorTest.java    |  11 +-
 .../inputformat/csv/CSVRecordReaderTest.java       |   2 +-
 .../plugin/inputformat/json/JSONRecordReader.java  |   4 +-
 .../inputformat/json/JSONRecordExtractorTest.java  |   2 +-
 .../inputformat/json/JSONRecordReaderTest.java     |   2 +-
 .../plugin/inputformat/orc/ORCRecordReader.java    |   6 +-
 .../inputformat/orc/ORCRecordExtractorTest.java    |   2 +-
 .../inputformat/orc/ORCRecordReaderTest.java       |   2 +-
 .../inputformat/parquet/ParquetRecordReader.java   |   4 +-
 .../parquet/ParquetRecordExtractorTest.java        |   2 +-
 .../parquet/ParquetRecordReaderTest.java           |   2 +-
 .../inputformat/thrift/ThriftRecordReader.java     |   4 +-
 .../inputformat/thrift/ThriftRecordReaderTest.java |   9 +-
 .../stream/kafka09/KafkaConsumerFactory.java       |   5 +-
 .../stream/kafka09/KafkaStreamLevelConsumer.java   |   5 +-
 .../stream/kafka20/KafkaConsumerFactory.java       |   5 +-
 .../stream/kafka20/KafkaStreamLevelConsumer.java   |   5 +-
 .../stream/kafka/KafkaJSONMessageDecoder.java      |  10 +-
 .../stream/kafka/KafkaJSONMessageDecoderTest.java  |   2 +-
 .../evaluators/DefaultTimeSpecEvaluator.java       |   1 -
 .../pinot/spi/data/readers/RecordReader.java       |   6 +-
 .../spi/data/readers/RecordReaderFactory.java      |  13 +-
 .../pinot/spi/stream/StreamConsumerFactory.java    |   6 +-
 .../pinot/spi/stream/StreamDecoderProvider.java    |  12 +-
 .../pinot/spi/stream/StreamMessageDecoder.java     |  12 +-
 .../pinot/spi/utils/SchemaFieldExtractorUtils.java |  24 +-
 .../data/readers/AbstractRecordExtractorTest.java  |  11 +-
 .../spi/data/readers/AbstractRecordReaderTest.java |   8 +
 .../apache/pinot/spi/plugin/PluginManagerTest.java |   2 +-
 .../spi/utils/SchemaFieldExtractorUtilsTest.java   |  13 +-
 pinot-spi/src/test/resources/TestRecordReader.java |   3 +-
 .../pinot/tools/admin/PinotAdministrator.java      |   2 -
 .../command/BackfillDateTimeColumnCommand.java     | 230 ------------------
 .../tools/admin/command/CreateSegmentCommand.java  |   3 +-
 .../pinot/tools/streams/MeetupRsvpStream.java      |   3 +-
 64 files changed, 200 insertions(+), 848 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 98f127b..a2de83b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -167,8 +168,10 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     // create and init stream level consumer
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" 
+ _streamConfig.getTopicName();
+    Set<String> sourceFields = 
SchemaFieldExtractorUtils.extractSourceFields(schema);
     _streamLevelConsumer = _streamConsumerFactory
-        .createStreamLevelConsumer(clientId, tableNameWithType, schema, 
instanceMetadata.getGroupId(tableNameWithType));
+        .createStreamLevelConsumer(clientId, tableNameWithType, schema, 
instanceMetadata.getGroupId(tableNameWithType),
+            sourceFields);
     _streamLevelConsumer.start();
 
     tableStreamName = tableNameWithType + "_" + _streamConfig.getTopicName();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4779697..628a4d7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -76,6 +76,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.TransientConsumerException;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -201,6 +202,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final TableConfig _tableConfig;
   private final RealtimeTableDataManager _realtimeTableDataManager;
   private final StreamMessageDecoder _messageDecoder;
+  private final Set<String> _sourceFields;
   private final int _segmentMaxRowCount;
   private final String _resourceDataDir;
   private final IndexLoadingConfig _indexLoadingConfig;
@@ -1151,7 +1153,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             .setConsumerDir(consumerDir);
 
     // Create message decoder
-    _messageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema);
+    _sourceFields = SchemaFieldExtractorUtils.extractSourceFields(_schema);
+    _messageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema, 
_sourceFields);
     _clientId = _streamTopic + "-" + _streamPartitionId;
 
     // Create record transformer
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
index fecf96d..c6f1377 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.readers;
 
 import java.io.File;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -44,7 +45,7 @@ public class GenericRowRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields) {
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 92f0bc7..23a17b0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -110,7 +110,7 @@ public class MultiplePinotSegmentRecordReader implements 
RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields) {
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index c146b0e..0205607 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.segment.ReadMode;
@@ -117,7 +118,7 @@ public class PinotSegmentRecordReader implements 
RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields) {
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
deleted file mode 100644
index 69d9b34..0000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.minion;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
-import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import 
org.apache.pinot.core.segment.creator.RecordReaderSegmentCreationDataSource;
-import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.FileFormat;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>BackfillDateTimeColumn</code> class takes a segment, a timeSpec 
from the segment, and a
- * dateTimeSpec.
- * It creates a new segment with a new column corresponding to the 
dateTimeSpec configs, using the values from the timeSpec
- * <ul>
- *  <li>If a column corresponding to the dateTimeSpec already exists, it is 
overwritten</li>
- *  <li>If not, a new date time column is created</li>
- *  <li>If the segment contains star tree, it is recreated, putting date time 
column at the end</li>
- * </ul>
- * <p>
- */
-public class BackfillDateTimeColumn {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BackfillDateTimeColumn.class);
-
-  private final String _rawTableName;
-  private final File _originalIndexDir;
-  private final File _backfilledIndexDir;
-  private final TimeFieldSpec _srcTimeFieldSpec;
-  private final DateTimeFieldSpec _destDateTimeFieldSpec;
-
-  public BackfillDateTimeColumn(String rawTableName, File originalIndexDir, 
File backfilledIndexDir,
-      TimeFieldSpec srcTimeSpec, DateTimeFieldSpec destDateTimeSpec) {
-    _rawTableName = rawTableName;
-    _originalIndexDir = originalIndexDir;
-    _backfilledIndexDir = backfilledIndexDir;
-    
Preconditions.checkArgument(!_originalIndexDir.getAbsolutePath().equals(_backfilledIndexDir.getAbsolutePath()),
-        "Original index dir and backfill index dir should not be the same");
-    _srcTimeFieldSpec = srcTimeSpec;
-    _destDateTimeFieldSpec = destDateTimeSpec;
-  }
-
-  public boolean backfill()
-      throws Exception {
-    SegmentMetadataImpl originalSegmentMetadata = new 
SegmentMetadataImpl(_originalIndexDir);
-    String segmentName = originalSegmentMetadata.getName();
-    LOGGER.info("Start backfilling segment: {} in table: {}", segmentName, 
_rawTableName);
-
-    PinotSegmentRecordReader segmentRecordReader = new 
PinotSegmentRecordReader(_originalIndexDir);
-    BackfillDateTimeRecordReader wrapperReader =
-        new BackfillDateTimeRecordReader(segmentRecordReader, 
_srcTimeFieldSpec, _destDateTimeFieldSpec);
-    LOGGER.info("Segment dir: {} Output Dir: {}", 
_originalIndexDir.getAbsolutePath(),
-        _backfilledIndexDir.getAbsolutePath());
-
-    LOGGER.info("Creating segment generator config for {}", segmentName);
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig();
-    config.setFormat(FileFormat.PINOT);
-    config.setOutDir(_backfilledIndexDir.getAbsolutePath());
-    config.setOverwrite(true);
-    config.setTableName(_rawTableName);
-    config.setSegmentName(segmentName);
-    config.setSchema(wrapperReader.getSchema());
-
-    LOGGER.info("Creating segment for {} with config {}", segmentName, 
config.toString());
-    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(config, new 
RecordReaderSegmentCreationDataSource(wrapperReader),
-        CompositeTransformer.getPassThroughTransformer());
-    driver.build();
-
-    return true;
-  }
-
-  public BackfillDateTimeRecordReader 
getBackfillDateTimeRecordReader(RecordReader baseRecordReader) {
-    return new BackfillDateTimeRecordReader(baseRecordReader, 
_srcTimeFieldSpec, _destDateTimeFieldSpec);
-  }
-
-  /**
-   * This record reader is a wrapper over another record reader.
-   * It simply reads the records from the base record reader, and adds a new 
field according to the
-   * dateTimeFieldSpec
-   */
-  public class BackfillDateTimeRecordReader implements RecordReader {
-    private final RecordReader _baseRecordReader;
-    private final TimeFieldSpec _timeFieldSpec;
-    private final DateTimeFieldSpec _dateTimeFieldSpec;
-    private final Schema _schema;
-
-    public BackfillDateTimeRecordReader(RecordReader baseRecordReader, 
TimeFieldSpec timeFieldSpec,
-        DateTimeFieldSpec dateTimeFieldSpec) {
-      _baseRecordReader = baseRecordReader;
-      _timeFieldSpec = timeFieldSpec;
-      _dateTimeFieldSpec = dateTimeFieldSpec;
-      _schema = baseRecordReader.getSchema();
-
-      // Add/replace the date time field spec to the schema
-      _schema.removeField(_dateTimeFieldSpec.getName());
-      _schema.addField(_dateTimeFieldSpec);
-    }
-
-    @Override
-    public void init(File dataFile, Schema schema, @Nullable 
RecordReaderConfig recordReaderConfig) {
-    }
-
-    @Override
-    public boolean hasNext() {
-      return _baseRecordReader.hasNext();
-    }
-
-    @Override
-    public GenericRow next()
-        throws IOException {
-      return next(new GenericRow());
-    }
-
-    /**
-     * Reads the next row from the baseRecordReader, and adds a 
dateTimeFieldSPec column to it
-     * {@inheritDoc}
-     * @see RecordReader#next(GenericRow)
-     */
-    @Override
-    public GenericRow next(GenericRow reuse)
-        throws IOException {
-      reuse = _baseRecordReader.next(reuse);
-      Long timeColumnValue = (Long) reuse.getValue(_timeFieldSpec.getName());
-      Object dateTimeColumnValue = 
convertTimeFieldToDateTimeFieldSpec(timeColumnValue);
-      reuse.putField(_dateTimeFieldSpec.getName(), dateTimeColumnValue);
-      return reuse;
-    }
-
-    /**
-     * Converts the time column value from timeFieldSpec to dateTimeFieldSpec
-     * @param timeColumnValue - time column value from timeFieldSpec
-     * @return
-     */
-    private Object convertTimeFieldToDateTimeFieldSpec(Object timeColumnValue) 
{
-      TimeGranularitySpec timeGranularitySpec = 
_timeFieldSpec.getOutgoingGranularitySpec();
-
-      DateTimeFormatSpec formatFromTimeSpec =
-          new DateTimeFormatSpec(timeGranularitySpec.getTimeUnitSize(), 
timeGranularitySpec.getTimeType().toString(),
-              timeGranularitySpec.getTimeFormat());
-      if 
(formatFromTimeSpec.getFormat().equals(_dateTimeFieldSpec.getFormat())) {
-        return timeColumnValue;
-      }
-
-      long timeColumnValueMS = timeGranularitySpec.toMillis(timeColumnValue);
-      DateTimeFormatSpec toFormat = new 
DateTimeFormatSpec(_dateTimeFieldSpec.getFormat());
-      return toFormat.fromMillisToFormat(timeColumnValueMS, Object.class);
-    }
-
-    @Override
-    public void rewind()
-        throws IOException {
-      _baseRecordReader.rewind();
-    }
-
-    @Override
-    public Schema getSchema() {
-      return _schema;
-    }
-
-    @Override
-    public void close()
-        throws IOException {
-      _baseRecordReader.close();
-    }
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 5cab0da..bdeaaae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
@@ -162,7 +163,8 @@ public class SegmentPurger {
     }
 
     @Override
-    public void init(File dataFile, Schema schema, @Nullable 
RecordReaderConfig recordReaderConfig) {
+    public void init(File dataFile, Schema schema, @Nullable 
RecordReaderConfig recordReaderConfig,
+        Set<String> sourceFields) {
     }
 
     @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
index 97be252..d892a36 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.data.readers.MultiplePinotSegmentRecordReader;
 import org.apache.pinot.spi.data.Schema;
@@ -55,7 +56,7 @@ public class MapperRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields) {
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
index 52a7241..a727c79 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.spi.data.Schema;
@@ -52,7 +53,7 @@ public class ReducerRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields) {
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index 7ebd327..07f0615 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.realtime.converter;
 
 import java.io.File;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.spi.data.Schema;
@@ -57,7 +58,7 @@ public class RealtimeSegmentRecordReader implements 
RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields) {
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 9653174..2249271 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -57,6 +57,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,6 +98,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     Schema schema = segmentGeneratorConfig.getSchema();
     FileFormat fileFormat = segmentGeneratorConfig.getFormat();
     String recordReaderClassName = 
segmentGeneratorConfig.getRecordReaderPath();
+    Set<String> fields = 
SchemaFieldExtractorUtils.extractSourceFields(segmentGeneratorConfig.getSchema());
 
     // Allow for instantiation general record readers from a record reader 
path passed into segment generator config
     // If this is set, this will override the file format
@@ -108,7 +110,8 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
             fileFormat);
       }
       return RecordReaderFactory
-          .getRecordReaderByClass(recordReaderClassName, dataFile, schema, 
segmentGeneratorConfig.getReaderConfig());
+          .getRecordReaderByClass(recordReaderClassName, dataFile, schema, 
segmentGeneratorConfig.getReaderConfig(),
+              fields);
     }
 
     switch (fileFormat) {
@@ -118,7 +121,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       default:
         try {
           return org.apache.pinot.spi.data.readers.RecordReaderFactory
-              .getRecordReader(fileFormat, dataFile, schema, 
segmentGeneratorConfig.getReaderConfig());
+              .getRecordReader(fileFormat, dataFile, schema, 
segmentGeneratorConfig.getReaderConfig(), fields);
         } catch (Exception e) {
           throw new UnsupportedOperationException("Unsupported input file 
format: '" + fileFormat + "'", e);
         }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/BackfillDateTimeRecordReaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/BackfillDateTimeRecordReaderTest.java
deleted file mode 100644
index 954e204..0000000
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/BackfillDateTimeRecordReaderTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.readers;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.minion.BackfillDateTimeColumn;
-import 
org.apache.pinot.core.minion.BackfillDateTimeColumn.BackfillDateTimeRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Tests the PinotSegmentRecordReader to check that the records being generated
- * are the same as the records used to create the segment
- */
-public class BackfillDateTimeRecordReaderTest {
-  private static final int NUM_ROWS = 10000;
-  private static final String TABLE_NAME = "myTable";
-  private static String D1 = "d1";
-  private static String D2 = "d2";
-  private static String M1 = "m1";
-  private static String M2 = "m2";
-
-  private List<GenericRow> createTestDataWithTimespec(TimeFieldSpec 
timeFieldSpec) {
-    List<GenericRow> rows = new ArrayList<>();
-    Random random = new Random();
-
-    Map<String, Object> fields;
-    for (int i = 0; i < NUM_ROWS; i++) {
-      fields = new HashMap<>();
-      fields.put(D1, RandomStringUtils.randomAlphabetic(2));
-      fields.put(D2, RandomStringUtils.randomAlphabetic(5));
-      fields.put(M1, Math.abs(random.nextInt()));
-      fields.put(M2, Math.abs(random.nextFloat()));
-
-      long timestamp = System.currentTimeMillis();
-      Object timeColumnValue = 
timeFieldSpec.getIncomingGranularitySpec().fromMillis(timestamp);
-      fields.put(timeFieldSpec.getName(), timeColumnValue);
-
-      GenericRow row = new GenericRow();
-      row.init(fields);
-      rows.add(row);
-    }
-    return rows;
-  }
-
-  private List<GenericRow> createTestDataWithTimespec(TimeFieldSpec 
timeFieldSpec,
-      DateTimeFieldSpec dateTimeFieldSpec) {
-    List<GenericRow> rows = new ArrayList<>();
-    Random random = new Random();
-
-    Map<String, Object> fields;
-    for (int i = 0; i < NUM_ROWS; i++) {
-      fields = new HashMap<>();
-      fields.put(D1, RandomStringUtils.randomAlphabetic(2));
-      fields.put(D2, RandomStringUtils.randomAlphabetic(5));
-      fields.put(M1, Math.abs(random.nextInt()));
-      fields.put(M2, Math.abs(random.nextFloat()));
-
-      long timestamp = System.currentTimeMillis();
-      Object timeColumnValue = 
timeFieldSpec.getIncomingGranularitySpec().fromMillis(timestamp);
-      fields.put(timeFieldSpec.getName(), timeColumnValue);
-
-      DateTimeFormatSpec toFormat = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-      Object dateTimeColumnValue = toFormat.fromMillisToFormat(timestamp, 
Object.class);
-      fields.put(dateTimeFieldSpec.getName(), dateTimeColumnValue);
-
-      GenericRow row = new GenericRow();
-      row.init(fields);
-      rows.add(row);
-    }
-    return rows;
-  }
-
-  private Schema createPinotSchemaWithTimeSpec(TimeFieldSpec timeSpec) {
-    Schema testSchema = new Schema();
-    testSchema.setSchemaName("schema");
-    FieldSpec spec;
-    spec = new DimensionFieldSpec(D1, DataType.STRING, true);
-    testSchema.addField(spec);
-    spec = new DimensionFieldSpec(D2, DataType.STRING, true);
-    testSchema.addField(spec);
-    spec = new MetricFieldSpec(M1, DataType.INT);
-    testSchema.addField(spec);
-    spec = new MetricFieldSpec(M2, DataType.FLOAT);
-    testSchema.addField(spec);
-    testSchema.addField(timeSpec);
-    return testSchema;
-  }
-
-  private Schema createPinotSchemaWithTimeSpec(TimeFieldSpec timeSpec, 
DateTimeFieldSpec dateTimeFieldSpec) {
-    Schema testSchema = createPinotSchemaWithTimeSpec(timeSpec);
-    testSchema.addField(dateTimeFieldSpec);
-    return testSchema;
-  }
-
-  private Schema createPinotSchemaWrapperWithDateTimeSpec(Schema schema, 
DateTimeFieldSpec dateTimeFieldSpec) {
-    schema.addField(dateTimeFieldSpec);
-    return schema;
-  }
-
-  @Test(dataProvider = "backfillRecordReaderDataProvider")
-  public void testBackfillDateTimeRecordReader(RecordReader baseRecordReader, 
TimeFieldSpec timeFieldSpec,
-      DateTimeFieldSpec dateTimeFieldSpec, Schema schemaExpected)
-      throws Exception {
-    BackfillDateTimeColumn backfillDateTimeColumn =
-        new BackfillDateTimeColumn(TABLE_NAME, new File("original"), new 
File("backup"), timeFieldSpec,
-            dateTimeFieldSpec);
-    try (BackfillDateTimeRecordReader wrapperReader = backfillDateTimeColumn
-        .getBackfillDateTimeRecordReader(baseRecordReader)) {
-
-      // check that schema has new column
-      Schema schemaActual = wrapperReader.getSchema();
-      Assert.assertEquals(schemaActual, schemaExpected);
-
-      DateTimeFieldSpec dateTimeFieldSpecActual = 
schemaActual.getDateTimeSpec(dateTimeFieldSpec.getName());
-      TimeFieldSpec timeFieldSpecActual = schemaActual.getTimeFieldSpec();
-      Assert.assertEquals(dateTimeFieldSpecActual, dateTimeFieldSpec);
-      Assert.assertEquals(timeFieldSpecActual, timeFieldSpec);
-
-      while (wrapperReader.hasNext()) {
-        GenericRow next = wrapperReader.next();
-
-        // check that new datetime column is generated
-        Object dateTimeColumnValueActual = 
next.getValue(dateTimeFieldSpec.getName());
-        Assert.assertNotNull(dateTimeColumnValueActual);
-
-        Object timeColumnValueActual = next.getValue(timeFieldSpec.getName());
-        Assert.assertNotNull(timeColumnValueActual);
-
-        // check that datetime column has correct value as per its format
-        Long timeColumnValueMS = 
timeFieldSpec.getIncomingGranularitySpec().toMillis(timeColumnValueActual);
-        DateTimeFormatSpec toFormat = new 
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
-        Object dateTimeColumnValueExpected = 
toFormat.fromMillisToFormat(timeColumnValueMS, Object.class);
-        Assert.assertEquals(dateTimeColumnValueActual, 
dateTimeColumnValueExpected);
-      }
-    }
-  }
-
-  @DataProvider(name = "backfillRecordReaderDataProvider")
-  public Object[][] getDataForTestBackfillRecordReader()
-      throws Exception {
-    List<Object[]> entries = new ArrayList<>();
-
-    List<GenericRow> inputData;
-    Schema inputSchema;
-    RecordReader inputRecordReader;
-    TimeFieldSpec timeFieldSpec;
-    DateTimeFieldSpec dateTimeFieldSpec;
-    Schema wrapperSchema;
-
-    // timeSpec in hoursSinceEpoch, generate dateTimeFieldSpec in 
millisSinceEpoch
-    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.HOURS, "Date"));
-    inputData = createTestDataWithTimespec(timeFieldSpec);
-    inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
-    inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
-    dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch", 
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
-    wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema, 
dateTimeFieldSpec);
-    entries.add(new Object[]{inputRecordReader, timeFieldSpec, 
dateTimeFieldSpec, wrapperSchema});
-
-    // timeSpec in hoursSinceEpoch, generate dateTimeFieldSpec in sdf day
-    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.HOURS, "Date"));
-    inputData = createTestDataWithTimespec(timeFieldSpec);
-    inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
-    inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
-    dateTimeFieldSpec =
-        new DateTimeFieldSpec("timestampInEpoch", DataType.LONG, 
"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "1:HOURS");
-    wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema, 
dateTimeFieldSpec);
-    entries.add(new Object[]{inputRecordReader, timeFieldSpec, 
dateTimeFieldSpec, wrapperSchema});
-
-    // timeSpec in hoursSinceEpoch, generate dateTimeFieldSpec in 
hoursSinceEpoch
-    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.HOURS, "Date"));
-    inputData = createTestDataWithTimespec(timeFieldSpec);
-    inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
-    inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
-    dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch", 
DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
-    wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema, 
dateTimeFieldSpec);
-    entries.add(new Object[]{inputRecordReader, timeFieldSpec, 
dateTimeFieldSpec, wrapperSchema});
-
-    // timeSpec in millisSinceEpoch, generate dateTimeFieldSpec in 5 
minutesSinceEpoch
-    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.MILLISECONDS, "Date"));
-    inputData = createTestDataWithTimespec(timeFieldSpec);
-    inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
-    inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
-    dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch", 
DataType.LONG, "5:MILLISECONDS:EPOCH", "1:HOURS");
-    wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema, 
dateTimeFieldSpec);
-    entries.add(new Object[]{inputRecordReader, timeFieldSpec, 
dateTimeFieldSpec, wrapperSchema});
-
-    // timeSpec in hoursSinceEpoch, dateTimeFieldSpec in millisSinceEpoch, 
override dateTimeFieldSpec in millisSinceEpoch
-    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.HOURS, "Date"));
-    dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch", 
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
-    inputData = createTestDataWithTimespec(timeFieldSpec, dateTimeFieldSpec);
-    inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec, 
dateTimeFieldSpec);
-    inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
-    entries.add(new Object[]{inputRecordReader, timeFieldSpec, 
dateTimeFieldSpec, inputSchema});
-
-    // timeSpec in hoursSinceEpoch, dateTimeFieldSpec in hoursSinceEpoch, 
override dateTimeFieldSpec in millisSinceEpoch
-    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.HOURS, "Date"));
-    dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch", 
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
-    inputData = createTestDataWithTimespec(timeFieldSpec, dateTimeFieldSpec);
-    inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec, 
dateTimeFieldSpec);
-    inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
-    dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch", 
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
-    wrapperSchema = createPinotSchemaWithTimeSpec(timeFieldSpec, 
dateTimeFieldSpec);
-    entries.add(new Object[]{inputRecordReader, timeFieldSpec, 
dateTimeFieldSpec, wrapperSchema});
-
-    // timeSpec in hoursSinceEpoch, dateTimeFieldSpec in hoursSinceEpoch, add 
new dateTimeFieldSpec in millisSinceEpoch
-    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 
TimeUnit.HOURS, "Date"));
-    dateTimeFieldSpec = new DateTimeFieldSpec("hoursSinceEpoch", 
DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
-    inputData = createTestDataWithTimespec(timeFieldSpec, dateTimeFieldSpec);
-    inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec, 
dateTimeFieldSpec);
-    inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
-    DateTimeFieldSpec dateTimeFieldSpecNew =
-        new DateTimeFieldSpec("timestampInEpoch", DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:HOURS");
-    wrapperSchema = createPinotSchemaWithTimeSpec(timeFieldSpec, 
dateTimeFieldSpec);
-    wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(wrapperSchema, 
dateTimeFieldSpecNew);
-    entries.add(new Object[]{inputRecordReader, timeFieldSpec, 
dateTimeFieldSpecNew, wrapperSchema});
-
-    return entries.toArray(new Object[entries.size()][]);
-  }
-}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
index 0a4735f..69d766e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.core.data.readers;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -62,6 +64,7 @@ public class RecordReaderSampleDataTest {
   private final Schema SCHEMA_NO_INCOMING = new Schema.SchemaBuilder()
       .addTime("incoming", TimeUnit.SECONDS, FieldSpec.DataType.LONG, 
"time_day", TimeUnit.DAYS, FieldSpec.DataType.INT)
       .build();
+
   // Outgoing time column does not exist in the record, should read incoming 
time only
   private final Schema SCHEMA_NO_OUTGOING = new Schema.SchemaBuilder()
       .addTime("time_day", TimeUnit.SECONDS, FieldSpec.DataType.LONG, 
"outgoing", TimeUnit.DAYS, FieldSpec.DataType.INT)
@@ -72,11 +75,11 @@ public class RecordReaderSampleDataTest {
       throws Exception {
     CompositeTransformer defaultTransformer = 
CompositeTransformer.getDefaultTransformer(SCHEMA);
     try (RecordReader avroRecordReader = RecordReaderFactory
-        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA, null);
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA, null, 
SCHEMA.getColumnNames());
         RecordReader csvRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA, 
null);
+            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA, 
null, SCHEMA.getColumnNames());
         RecordReader jsonRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA, 
null)) {
+            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA, 
null, SCHEMA.getColumnNames())) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
@@ -117,11 +120,14 @@ public class RecordReaderSampleDataTest {
   public void testDifferentIncomingOutgoing()
       throws Exception {
     try (RecordReader avroRecordReader = RecordReaderFactory
-        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null);
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null,
+            Sets.newHashSet("time_day", "column2"));
         RecordReader csvRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null);
+        .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null,
+            Sets.newHashSet("time_day", "column2"));
         RecordReader jsonRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null)) {
+        .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null,
+            Sets.newHashSet("time_day", "column2"))) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
@@ -151,11 +157,11 @@ public class RecordReaderSampleDataTest {
   public void testNoIncoming()
       throws Exception {
     try (RecordReader avroRecordReader = RecordReaderFactory
-        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null);
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null, SCHEMA_NO_INCOMING.getColumnNames());
         RecordReader csvRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null);
+            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null, SCHEMA_NO_INCOMING.getColumnNames());
         RecordReader jsonRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null)) {
+            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null, SCHEMA_NO_INCOMING.getColumnNames())) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
@@ -185,11 +191,12 @@ public class RecordReaderSampleDataTest {
   public void testNoOutgoing()
       throws Exception {
     try (RecordReader avroRecordReader = RecordReaderFactory
-        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null);
-        RecordReader csvRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null);
-        RecordReader jsonRecordReader = RecordReaderFactory
-            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null)) {
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null,
+            Sets.newHashSet("time_day", "outgoing")); RecordReader 
csvRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null,
+            Sets.newHashSet("time_day", "outgoing")); RecordReader 
jsonRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null,
+            Sets.newHashSet("time_day", "outgoing"))) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
index 7719eb8..264d1f1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
@@ -56,7 +56,8 @@ public class MutableSegmentImplNullValueVectorTest {
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
             false, true);
     GenericRow reuse = new GenericRow();
-    try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile, _schema, null)) {
+    try (RecordReader recordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.JSON, jsonFile, _schema, null, 
_schema.getColumnNames())) {
       while (recordReader.hasNext()) {
         recordReader.next(reuse);
         GenericRow transformedRow = _recordTransformer.transform(reuse);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 49a53b2..b3ba6eb 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -86,7 +86,8 @@ public class MutableSegmentImplTest {
     StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(_lastIngestionTimeMs);
     _startTimeMs = System.currentTimeMillis();
 
-    try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.AVRO, avroFile, _schema, null)) {
+    try (RecordReader recordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, avroFile, _schema, null, 
_schema.getColumnNames())) {
       GenericRow reuse = new GenericRow();
       while (recordReader.hasNext()) {
         _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 8a21772..a573d58 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.realtime.impl.fakestream;
 
+import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.MessageBatch;
@@ -30,6 +31,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 
 
 /**
@@ -47,7 +49,7 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
 
   @Override
   public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Schema schema,
-      String groupId) {
+      String groupId, Set<String> sourceFields) {
     return new FakeStreamLevelConsumer();
   }
 
@@ -91,7 +93,8 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
 
     // Message decoder
     Schema pinotSchema = FakeStreamConfigUtils.getPinotSchema();
-    StreamMessageDecoder streamMessageDecoder = 
StreamDecoderProvider.create(streamConfig, pinotSchema);
+    StreamMessageDecoder streamMessageDecoder = 
StreamDecoderProvider.create(streamConfig, pinotSchema,
+        SchemaFieldExtractorUtils.extractSourceFields(pinotSchema));
     GenericRow decodedRow = new GenericRow();
     streamMessageDecoder.decode(messageBatch.getMessageAtIndex(0), decodedRow);
     System.out.println(decodedRow);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
index 93bff70..c823cc6 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
@@ -29,7 +30,7 @@ import org.apache.pinot.spi.stream.StreamMessageDecoder;
  */
 public class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
   @Override
-  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName) throws Exception {
+  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName, Set<String> sourceFields) throws Exception {
 
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 2db50c6..1d84a67 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -70,7 +70,6 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.stream.StreamConfig;
@@ -353,13 +352,12 @@ public abstract class ClusterTest extends ControllerTest {
     private DatumReader<GenericData.Record> _reader;
 
     @Override
-    public void init(Map<String, String> props, Schema indexingSchema, String 
topicName)
+    public void init(Map<String, String> props, Schema indexingSchema, String 
topicName, Set<String> sourceFields)
         throws Exception {
       // Load Avro schema
       DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile);
       _avroSchema = reader.getSchema();
       reader.close();
-      Set<String> sourceFields = 
SchemaFieldExtractorUtils.extract(indexingSchema);
       _recordExtractor = new AvroRecordExtractor();
       _recordExtractor.init(sourceFields, null);
       _reader = new GenericDatumReader<>(_avroSchema);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index 25ad714..73e07e7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.integration.tests;
 
 import java.lang.reflect.Constructor;
 import java.util.Random;
+import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
@@ -53,12 +54,12 @@ public class FlakyConsumerRealtimeClusterIntegrationTest 
extends RealtimeCluster
     private Random _random = new Random();
 
     public FlakyStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig, Schema schema,
-        String groupId) {
+        String groupId, Set<String> sourceFields) {
       try {
         final Constructor constructor = 
Class.forName(KafkaStarterUtils.KAFKA_STREAM_LEVEL_CONSUMER_CLASS_NAME)
-            .getConstructor(String.class, String.class, StreamConfig.class, 
Schema.class, String.class);
+            .getConstructor(String.class, String.class, StreamConfig.class, 
Schema.class, String.class, Set.class);
         _streamLevelConsumer = (StreamLevelConsumer) constructor
-            .newInstance(clientId, tableName, streamConfig, schema, groupId);
+            .newInstance(clientId, tableName, streamConfig, schema, groupId, 
sourceFields);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -111,8 +112,8 @@ public class FlakyConsumerRealtimeClusterIntegrationTest 
extends RealtimeCluster
 
     @Override
     public StreamLevelConsumer createStreamLevelConsumer(String clientId, 
String tableName, Schema schema,
-        String groupId) {
-      return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId);
+        String groupId, Set<String> sourceFields) {
+      return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId, sourceFields);
     }
 
     @Override
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 91f1028..353450e 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -81,12 +81,6 @@ public class SegmentGenerationTaskRunner implements 
Serializable {
       recordReaderConfig = (RecordReaderConfig) 
JsonUtils.jsonNodeToObject(jsonNode, clazz);
     }
 
-    //init record reader
-    String readerClassName = _taskSpec.getRecordReaderSpec().getClassName();
-    RecordReader recordReader = 
PluginManager.get().createInstance(readerClassName);
-
-    recordReader.init(new File(_taskSpec.getInputFilePath()), schema, 
recordReaderConfig);
-
     //init segmentName Generator
     SegmentNameGenerator segmentNameGenerator = getSegmentNameGerator();
 
@@ -96,10 +90,13 @@ public class SegmentGenerationTaskRunner implements 
Serializable {
     segmentGeneratorConfig.setOutDir(_taskSpec.getOutputDirectoryPath());
     segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
     segmentGeneratorConfig.setSequenceId(_taskSpec.getSequenceId());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    
segmentGeneratorConfig.setRecordReaderPath(_taskSpec.getRecordReaderSpec().getClassName());
+    segmentGeneratorConfig.setInputFilePath(_taskSpec.getInputFilePath());
 
     //build segment
     SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new 
SegmentIndexCreationDriverImpl();
-    segmentIndexCreationDriver.init(segmentGeneratorConfig, recordReader);
+    segmentIndexCreationDriver.init(segmentGeneratorConfig);
     segmentIndexCreationDriver.build();
     return segmentIndexCreationDriver.getSegmentName();
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
index 61b7245..1a07dd6 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
@@ -28,7 +28,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 
 
 /**
@@ -45,7 +44,7 @@ public class AvroRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields)
       throws IOException {
     _dataFile = dataFile;
     _schema = schema;
@@ -56,7 +55,6 @@ public class AvroRecordReader implements RecordReader {
       _avroReader.close();
       throw e;
     }
-    Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
     _recordExtractor = new AvroRecordExtractor();
     _recordExtractor.init(sourceFields, null);
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
index 0d1250d..7fed72d 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.inputformat.avro;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,6 +27,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -78,6 +80,11 @@ public class AvroRecordExtractorMapTypeTest extends 
AbstractRecordExtractorTest
     return org.apache.pinot.spi.data.Schema.fromInputSteam(schemaInputStream);
   }
 
+  @Override
+  protected Set<String> getSourceFields() {
+    return Sets.newHashSet("map1", "map2");
+  }
+
   /**
    * Create an AvroRecordReader
    */
@@ -85,7 +92,7 @@ public class AvroRecordExtractorMapTypeTest extends 
AbstractRecordExtractorTest
   protected RecordReader createRecordReader()
       throws IOException {
     AvroRecordReader avroRecordReader = new AvroRecordReader();
-    avroRecordReader.init(_dataFile, _pinotSchema, null);
+    avroRecordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
     return avroRecordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
index 568ec86..30b7531 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
@@ -48,7 +48,7 @@ public class AvroRecordExtractorTest extends 
AbstractRecordExtractorTest {
   protected RecordReader createRecordReader()
       throws IOException {
     AvroRecordReader avroRecordReader = new AvroRecordReader();
-    avroRecordReader.init(_dataFile, _pinotSchema, null);
+    avroRecordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
     return avroRecordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
index 91c1daf..1ae0356 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
@@ -39,7 +39,7 @@ public class AvroRecordReaderTest extends 
AbstractRecordReaderTest {
   protected RecordReader createRecordReader()
       throws Exception {
     AvroRecordReader avroRecordReader = new AvroRecordReader();
-    avroRecordReader.init(_dataFile, getPinotSchema(), null);
+    avroRecordReader.init(_dataFile, getPinotSchema(), null, _sourceFields);
     return avroRecordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
index 8e43788..8a43f61 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
@@ -47,7 +47,7 @@ public class AvroRecordToPinotRowGeneratorTest {
         new 
org.apache.pinot.spi.data.Schema.SchemaBuilder().setSchemaName("testSchema")
             .addTime("incomingTime", TimeUnit.MILLISECONDS, 
FieldSpec.DataType.LONG, "outgoingTime", TimeUnit.DAYS,
                 FieldSpec.DataType.INT).build();
-    Set<String> sourceFields = SchemaFieldExtractorUtils.extract(pinotSchema);
+    Set<String> sourceFields = 
SchemaFieldExtractorUtils.extractSourceFields(pinotSchema);
 
     AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
     avroRecordExtractor.init(sourceFields, null);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
index d433bf9..67e8b27 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
@@ -41,7 +41,6 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -49,6 +48,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * An implementation of StreamMessageDecoder to read avro from a Kafka stream
+ * NOTE: Do not use schema in the implementation, as schema will be removed 
from the params
+ */
 @NotThreadSafe
 public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaAvroMessageDecoder.class);
@@ -82,7 +85,7 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   private String[] _schemaRegistryUrls;
 
   @Override
-  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName)
+  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName, Set<String> sourceFields)
       throws Exception {
     _schemaRegistryUrls = 
parseSchemaRegistryUrls(props.get(SCHEMA_REGISTRY_REST_URL));
 
@@ -106,7 +109,6 @@ public class KafkaAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
         LOGGER.info("Populated schema cache with schema for {}", hashKey);
       }
     }
-    Set<String> sourceFields = 
SchemaFieldExtractorUtils.extract(indexingSchema);
     String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
     // Backward compatibility to support Avro by default
     if (recordExtractorClass == null) {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
index 4da8843..bb2cf9c 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
@@ -33,11 +33,14 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * An implementation of StreamMessageDecoder to read simple avro records from 
stream
+ * NOTE: Do not use schema in the implementation, as schema will be removed 
from the params
+ */
 @NotThreadSafe
 public class SimpleAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleAvroMessageDecoder.class);
@@ -51,10 +54,9 @@ public class SimpleAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   private GenericData.Record _avroRecordToReuse;
 
   @Override
-  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName)
+  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName, Set<String> sourceFields)
       throws Exception {
     Preconditions.checkState(props.containsKey(SCHEMA), "Avro schema must be 
provided");
-    Set<String> sourceFields = 
SchemaFieldExtractorUtils.extract(indexingSchema);
     _avroSchema = new org.apache.avro.Schema.Parser().parse(props.get(SCHEMA));
     _datumReader = new GenericDatumReader<>(_avroSchema);
     String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
index 2aa966b..849eb49 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
@@ -32,13 +32,13 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 
 import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Decodes avro messages with confluent schema registry.
  * First byte is MAGIC = 0, second 4 bytes are the schema id, the remainder is 
the value.
+ * NOTE: Do not use schema in the implementation, as schema will be removed 
from the params
  */
 public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     private static final String SCHEMA_REGISTRY_REST_URL = 
"schema.registry.rest.url";
@@ -47,11 +47,9 @@ public class KafkaConfluentSchemaRegistryAvroMessageDecoder 
implements StreamMes
     private String _topicName;
 
     @Override
-    public void init(Map<String, String> props, Schema indexingSchema, String 
topicName) throws Exception {
+    public void init(Map<String, String> props, Schema indexingSchema, String 
topicName, Set<String> sourceFields) throws Exception {
         checkState(props.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing 
required property '%s'", SCHEMA_REGISTRY_REST_URL);
         String schemaRegistryUrl = props.get(SCHEMA_REGISTRY_REST_URL);
-        Preconditions.checkNotNull(indexingSchema, "Schema must be provided");
-        Set<String> sourceFields = 
SchemaFieldExtractorUtils.extract(indexingSchema);
         SchemaRegistryClient schemaRegistryClient = new 
CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
         _deserializer = new KafkaAvroDeserializer(schemaRegistryClient);
         Preconditions.checkNotNull(topicName, "Topic must be provided");
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 2ad6318..df6ce4f 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -32,7 +32,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 
 
 /**
@@ -52,7 +51,7 @@ public class CSVRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields)
       throws IOException {
     _dataFile = dataFile;
     _schema = schema;
@@ -95,7 +94,6 @@ public class CSVRecordReader implements RecordReader {
       _format = format;
       _multiValueDelimiter = config.getMultiValueDelimiter();
     }
-    Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
     _recordExtractor = new CSVRecordExtractor();
     CSVRecordExtractorConfig recordExtractorConfig = new 
CSVRecordExtractorConfig();
     recordExtractorConfig.setMultiValueDelimiter(_multiValueDelimiter);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
index d561f09..233e686 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
@@ -49,7 +49,7 @@ public class CSVRecordExtractorTest extends 
AbstractRecordExtractorTest {
     CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
     csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
     CSVRecordReader csvRecordReader = new CSVRecordReader();
-    csvRecordReader.init(_dataFile, _pinotSchema, csvRecordReaderConfig);
+    csvRecordReader.init(_dataFile, _pinotSchema, csvRecordReaderConfig, 
_sourceFieldNames);
     return csvRecordReader;
   }
 
@@ -59,13 +59,14 @@ public class CSVRecordExtractorTest extends 
AbstractRecordExtractorTest {
   @Override
   public void createInputFile()
       throws IOException {
+    String[] header = _sourceFieldNames.toArray(new String[0]);
     try (FileWriter fileWriter = new FileWriter(_dataFile); CSVPrinter 
csvPrinter = new CSVPrinter(fileWriter,
-        CSVFormat.DEFAULT.withHeader(_sourceFieldNames.toArray(new 
String[0])))) {
+        CSVFormat.DEFAULT.withHeader(header))) {
 
       for (Map<String, Object> inputRecord : _inputRecords) {
-        Object[] record = new Object[_sourceFieldNames.size()];
-        for (int i = 0; i < _sourceFieldNames.size(); i++) {
-          Object value = inputRecord.get(_sourceFieldNames.get(i));
+        Object[] record = new Object[header.length];
+        for (int i = 0; i < header.length; i++) {
+          Object value = inputRecord.get(header[i]);
           if (value instanceof Collection) {
             record[i] = StringUtils.join(((List) value).toArray(), 
CSV_MULTI_VALUE_DELIMITER);
           } else {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index cef8d71..58bcb16 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -43,7 +43,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
     csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
     CSVRecordReader csvRecordReader = new CSVRecordReader();
-    csvRecordReader.init(_dataFile, getPinotSchema(), csvRecordReaderConfig);
+    csvRecordReader.init(_dataFile, getPinotSchema(), csvRecordReaderConfig, 
_sourceFields);
     return csvRecordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
index 4636c86..c9a2fb4 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
@@ -30,7 +30,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 
 
 /**
@@ -60,11 +59,10 @@ public class JSONRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields)
       throws IOException {
     _dataFile = dataFile;
     _schema = schema;
-    Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
     _recordExtractor = new JSONRecordExtractor();
     _recordExtractor.init(sourceFields, null);
     init();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
index cfea8fd..9451425 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
@@ -42,7 +42,7 @@ public class JSONRecordExtractorTest extends 
AbstractRecordExtractorTest {
   protected RecordReader createRecordReader()
       throws IOException {
     JSONRecordReader recordReader = new JSONRecordReader();
-    recordReader.init(_dataFile, _pinotSchema, null);
+    recordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
     return recordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
index f89a6ac..b4904e1 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
@@ -38,7 +38,7 @@ public class JSONRecordReaderTest extends 
AbstractRecordReaderTest {
   protected RecordReader createRecordReader()
       throws Exception {
     JSONRecordReader recordReader = new JSONRecordReader();
-    recordReader.init(_dateFile, getPinotSchema(), null);
+    recordReader.init(_dateFile, getPinotSchema(), null, _sourceFields);
     return recordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index 824a33b..2bf8193 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -45,7 +45,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.apache.pinot.spi.utils.StringUtils;
 
 
@@ -75,7 +74,7 @@ public class ORCRecordReader implements RecordReader {
   private int _nextRowId;
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields)
       throws IOException {
     _schema = schema;
 
@@ -89,7 +88,6 @@ public class ORCRecordReader implements RecordReader {
     _orcFieldTypes = orcSchema.getChildren();
 
     // Only read the required fields
-    Set<String> schemaFields = SchemaFieldExtractorUtils.extract(schema);
     int numOrcFields = _orcFields.size();
     _includeOrcFields = new boolean[numOrcFields];
     // NOTE: Include for ORC reader uses field id as the index
@@ -97,7 +95,7 @@ public class ORCRecordReader implements RecordReader {
     orcReaderInclude[orcSchema.getId()] = true;
     for (int i = 0; i < numOrcFields; i++) {
       String field = _orcFields.get(i);
-      if (schemaFields.contains(field)) {
+      if (sourceFields.contains(field)) {
         TypeDescription fieldType = _orcFieldTypes.get(i);
         TypeDescription.Category category = fieldType.getCategory();
         if (category == TypeDescription.Category.LIST) {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
index 8e891f3..7b4c7c5 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
@@ -50,7 +50,7 @@ public class ORCRecordExtractorTest extends 
AbstractRecordExtractorTest {
   protected RecordReader createRecordReader()
       throws IOException {
     ORCRecordReader orcRecordReader = new ORCRecordReader();
-    orcRecordReader.init(_dataFile, _pinotSchema, null);
+    orcRecordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
     return orcRecordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
index 252874b..3cf9de2 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
@@ -44,7 +44,7 @@ public class ORCRecordReaderTest extends 
AbstractRecordReaderTest {
   protected RecordReader createRecordReader()
       throws Exception {
     ORCRecordReader orcRecordReader = new ORCRecordReader();
-    orcRecordReader.init(_dataFile, _pinotSchema, null);
+    orcRecordReader.init(_dataFile, _pinotSchema, null, _sourceFields);
     return orcRecordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index d4ed56f..eb71d7e 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -29,7 +29,6 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 
 
 /**
@@ -43,12 +42,11 @@ public class ParquetRecordReader implements RecordReader {
   private GenericRecord _nextRecord;
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields)
       throws IOException {
     _dataFilePath = new Path(dataFile.getAbsolutePath());
     _schema = schema;
     ParquetUtils.validateSchema(_schema, 
ParquetUtils.getParquetSchema(_dataFilePath));
-    Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
     _recordExtractor = new ParquetRecordExtractor();
     _recordExtractor.init(sourceFields, null);
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
index 976d935..95c6496 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
@@ -54,7 +54,7 @@ public class ParquetRecordExtractorTest extends 
AbstractRecordExtractorTest {
   protected RecordReader createRecordReader()
       throws IOException {
     ParquetRecordReader recordReader = new ParquetRecordReader();
-    recordReader.init(_dataFile, _pinotSchema, null);
+    recordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
     return recordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 4761e46..b577799 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -39,7 +39,7 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
   protected RecordReader createRecordReader()
       throws Exception {
     ParquetRecordReader recordReader = new ParquetRecordReader();
-    recordReader.init(_dataFile, getPinotSchema(), null);
+    recordReader.init(_dataFile, getPinotSchema(), null, _sourceFields);
     return recordReader;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
index 42bcd34..5670b04 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
@@ -30,7 +30,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TFieldIdEnum;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -76,7 +75,7 @@ public class ThriftRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
config)
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
config, Set<String> sourceFields)
       throws IOException {
     ThriftRecordReaderConfig recordReaderConfig = (ThriftRecordReaderConfig) 
config;
     _dataFile = dataFile;
@@ -94,7 +93,6 @@ public class ThriftRecordReader implements RecordReader {
       _fieldIds.put(tFieldIdEnum.getFieldName(), index);
       index++;
     }
-    Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
     ThriftRecordExtractorConfig recordExtractorConfig = new 
ThriftRecordExtractorConfig();
     recordExtractorConfig.setFieldIds(_fieldIds);
     _recordExtractor = new ThriftRecordExtractor();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
index 899c54e..906c6b3 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.inputformat.thrift;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -95,7 +96,7 @@ public class ThriftRecordReaderTest {
   public void testReadData()
       throws IOException {
     ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig());
+    recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig(), 
getSourceFields());
     List<GenericRow> genericRows = new ArrayList<>();
     while (recordReader.hasNext()) {
       genericRows.add(recordReader.next());
@@ -114,7 +115,7 @@ public class ThriftRecordReaderTest {
   public void testRewind()
       throws IOException {
     ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig());
+    recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig(), 
getSourceFields());
     List<GenericRow> genericRows = new ArrayList<>();
     while (recordReader.hasNext()) {
       genericRows.add(recordReader.next());
@@ -150,6 +151,10 @@ public class ThriftRecordReaderTest {
         .addMultiValueDimension("set_values", 
FieldSpec.DataType.STRING).build();
   }
 
+  private Set<String> getSourceFields() {
+    return Sets.newHashSet("id", "name", "created_at", "active", "groups", 
"set_values");
+  }
+
   @AfterClass
   public void tearDown() {
     FileUtils.deleteQuietly(_tempFile);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index 18228ea..1145ba7 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.stream.kafka09;
 
+import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -51,8 +52,8 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
    */
   @Override
   public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Schema schema,
-      String groupId) {
-    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId);
+      String groupId, Set<String> sourceFields) {
+    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId, sourceFields);
   }
 
   /**
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
index c727d31..38dcef3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.stream.kafka09;
 
+import java.util.Set;
 import kafka.consumer.ConsumerIterator;
 import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.pinot.spi.stream.StreamConfig;
@@ -52,12 +53,12 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
   private long currentCount = 0L;
 
   public KafkaStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig, Schema schema,
-      String groupId) {
+      String groupId, Set<String> sourceFields) {
     _clientId = clientId;
     _streamConfig = streamConfig;
     _kafkaHighLevelStreamConfig = new KafkaHighLevelStreamConfig(streamConfig, 
tableName, groupId);
 
-    _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
+    _messageDecoder = StreamDecoderProvider.create(streamConfig, schema, 
sourceFields);
 
     _tableAndStreamName = tableName + "-" + streamConfig.getTopicName();
     INSTANCE_LOGGER = LoggerFactory
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index 444e8f7..8c5c793 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
+import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -33,8 +34,8 @@ public class KafkaConsumerFactory extends 
StreamConsumerFactory {
 
   @Override
   public StreamLevelConsumer createStreamLevelConsumer(String clientId, String 
tableName, Schema schema,
-      String groupId) {
-    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId);
+      String groupId, Set<String> sourceFields) {
+    return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig, 
schema, groupId, sourceFields);
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
index 60a04b7..e01ac94 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
@@ -22,6 +22,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -64,12 +65,12 @@ public class KafkaStreamLevelConsumer implements 
StreamLevelConsumer {
 
 
   public KafkaStreamLevelConsumer(String clientId, String tableName, 
StreamConfig streamConfig, Schema schema,
-      String groupId) {
+      String groupId, Set<String> sourceFields) {
     _clientId = clientId;
     _streamConfig = streamConfig;
     _kafkaStreamLevelStreamConfig = new 
KafkaStreamLevelStreamConfig(streamConfig, tableName, groupId);
 
-    _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
+    _messageDecoder = StreamDecoderProvider.create(streamConfig, schema, 
sourceFields);
 
     _tableAndStreamName = tableName + "-" + streamConfig.getTopicName();
     INSTANCE_LOGGER = LoggerFactory
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
index 5ee48c0..3355247 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
@@ -21,13 +21,10 @@ package org.apache.pinot.plugin.stream.kafka;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -37,6 +34,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * An implementation of StreamMessageDecoder to read JSON records from a stream
+ * NOTE: Do not use schema in the implementation, as schema will be removed 
from the params
+ */
 public class KafkaJSONMessageDecoder implements StreamMessageDecoder<byte[]> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaJSONMessageDecoder.class);
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -45,9 +46,8 @@ public class KafkaJSONMessageDecoder implements 
StreamMessageDecoder<byte[]> {
   private RecordExtractor<Map<String, Object>> _jsonRecordExtractor;
 
   @Override
-  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName)
+  public void init(Map<String, String> props, Schema indexingSchema, String 
topicName, Set<String> sourceFields)
       throws Exception {
-    Set<String> sourceFields = 
SchemaFieldExtractorUtils.extract(indexingSchema);
     String recordExtractorClass = null;
     if (props != null) {
       recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
index b1553d0..29fa10f 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
@@ -63,7 +63,7 @@ public class KafkaJSONMessageDecoderTest {
     try (BufferedReader reader = new BufferedReader(
         new 
FileReader(getClass().getClassLoader().getResource("data/test_sample_data.json").getFile())))
 {
       KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
-      decoder.init(new HashMap<>(), schema, "testTopic");
+      decoder.init(new HashMap<>(), schema, "testTopic", 
schema.getColumnNames());
       GenericRow r = new GenericRow();
       String line = reader.readLine();
       while (line != null) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
index 77eaec5..dcb242d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.spi.data.function.evaluators;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.List;
 import org.apache.pinot.spi.data.TimeFieldSpec;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index 4af4687..23be865 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.Schema;
 
@@ -38,11 +39,12 @@ public interface RecordReader extends Closeable {
    * @param dataFile Data file
    * @param schema Pinot Schema associated with the table
    * @param recordReaderConfig Config for the reader specific to the format. 
e.g. delimiter for csv format etc
+   * @param sourceFields The fields to extract from the source record
    * @throws IOException If an I/O error occurs
    *
-   * TODO: decouple Schema and RecordReader
+   * TODO: decouple Schema and RecordReader. This should be easier, now that 
we have field names to extract
    */
-  void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+  void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> sourceFields)
       throws IOException;
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
index e7cb6d9..8817852 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -120,10 +121,10 @@ public class RecordReaderFactory {
    * @throws Exception
    */
   public static RecordReader getRecordReaderByClass(String 
recordReaderClassName, File dataFile, Schema schema,
-      RecordReaderConfig recordReaderConfig)
+      RecordReaderConfig recordReaderConfig, Set<String> fields)
       throws Exception {
     RecordReader recordReader = 
PluginManager.get().createInstance(recordReaderClassName);
-    recordReader.init(dataFile, schema, recordReaderConfig);
+    recordReader.init(dataFile, schema, recordReaderConfig, fields);
     return recordReader;
   }
 
@@ -138,9 +139,9 @@ public class RecordReaderFactory {
    * @throws Exception Any exception while initializing the RecordReader
    */
   public static RecordReader getRecordReader(FileFormat fileFormat, File 
dataFile, Schema schema,
-      RecordReaderConfig recordReaderConfig)
+      RecordReaderConfig recordReaderConfig, Set<String> fields)
       throws Exception {
-    return getRecordReader(fileFormat.name(), dataFile, schema, 
recordReaderConfig);
+    return getRecordReader(fileFormat.name(), dataFile, schema, 
recordReaderConfig, fields);
   }
 
   /**
@@ -154,12 +155,12 @@ public class RecordReaderFactory {
    * @throws Exception Any exception while initializing the RecordReader
    */
   public static RecordReader getRecordReader(String fileFormatStr, File 
dataFile, Schema schema,
-      RecordReaderConfig recordReaderConfig)
+      RecordReaderConfig recordReaderConfig, Set<String> fields)
       throws Exception {
     String fileFormatKey = fileFormatStr.toUpperCase();
     if (DEFAULT_RECORD_READER_CLASS_MAP.containsKey(fileFormatKey)) {
       return 
getRecordReaderByClass(DEFAULT_RECORD_READER_CLASS_MAP.get(fileFormatKey), 
dataFile, schema,
-          recordReaderConfig);
+          recordReaderConfig, fields);
     }
     throw new UnsupportedOperationException(
         "No supported RecordReader found for file format - '" + fileFormatStr 
+ "'");
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 60b1da2..caa5d6b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.Set;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -49,10 +50,11 @@ public abstract class StreamConsumerFactory {
    * @param tableName the table name for the topic of this consumer
    * @param schema the pinot schema of the event being consumed
    * @param groupId consumer group Id
-   * @return
+   * @param sourceFields the fields to extract from the source stream
+   * @return the stream level consumer
    */
   public abstract StreamLevelConsumer createStreamLevelConsumer(String 
clientId, String tableName, Schema schema,
-      String groupId);
+      String groupId, Set<String> sourceFields);
 
   /**
    * Creates a metadata provider which provides partition specific metadata
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
index e4cac8a..852be98 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream;
 
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.plugin.PluginManager;
@@ -31,17 +32,18 @@ public abstract class StreamDecoderProvider {
 
   /**
    * Constructs a {@link StreamMessageDecoder} using properties in {@link 
StreamConfig} and initializes it
-   * @param streamConfig
-   * @param schema
-   * @return
+   * @param streamConfig the stream configs from the table config
+   * @param schema the schema of the Pinot table
+   * @param sourceFields the fields to extract from the source stream
+   * @return the StreamMessageDecoder
    */
-  public static StreamMessageDecoder create(StreamConfig streamConfig, Schema 
schema) {
+  public static StreamMessageDecoder create(StreamConfig streamConfig, Schema 
schema, Set<String> sourceFields) {
     StreamMessageDecoder decoder = null;
     String decoderClass = streamConfig.getDecoderClass();
     Map<String, String> decoderProperties = 
streamConfig.getDecoderProperties();
     try {
       decoder = PluginManager.get().createInstance(decoderClass);
-      decoder.init(decoderProperties, schema, streamConfig.getTopicName());
+      decoder.init(decoderProperties, schema, streamConfig.getTopicName(), 
sourceFields);
     } catch (Exception e) {
       ExceptionUtils.rethrow(e);
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
index 05a78ef..c3a7347 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream;
 
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.data.Schema;
@@ -33,14 +34,17 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 @InterfaceStability.Stable
 public interface StreamMessageDecoder<T> {
 
-  static final String RECORD_EXTRACTOR_CONFIG_KEY = "recordExtractorClass";
+  String RECORD_EXTRACTOR_CONFIG_KEY = "recordExtractorClass";
 
   /**
-   * Initialize the decoder with decoder properties map, the stream topic name 
and stream schema
-   * @param props
+   * Initialize the decoder
+   * @param props decoder properties extracted from the {@link StreamConfig}
+   * @param indexingSchema the Pinot schema TODO: Remove Schema from 
StreamMessageDecoder. Do not use inside the implementation, as this will be 
removed
+   * @param topicName topic name of the stream
+   * @param sourceFields the fields to be read from the source stream's record
    * @throws Exception
    */
-  void init(Map<String, String> props, Schema indexingSchema, String topicName)
+  void init(Map<String, String> props, Schema indexingSchema, String 
topicName, Set<String> sourceFields)
       throws Exception;
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
index 1a53756..0c34046 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
@@ -19,17 +19,15 @@
 package org.apache.pinot.spi.utils;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
+import 
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
-import 
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +48,7 @@ public class SchemaFieldExtractorUtils {
    *
    * TODO: for now, we assume that arguments to transform function are in the 
source i.e. there's no columns which are derived from transformed columns
    */
-  public static Set<String> extract(Schema schema) {
+  public static Set<String> extractSourceFields(Schema schema) {
     Set<String> sourceFieldNames = new HashSet<>();
     for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
       if (!fieldSpec.isVirtualColumn()) {
@@ -64,22 +62,6 @@ public class SchemaFieldExtractorUtils {
     return sourceFieldNames;
   }
 
-  @VisibleForTesting
-  public static Set<String> extractSource(Schema schema) {
-    Set<String> sourceFieldNames = new HashSet<>();
-    for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-      if (!fieldSpec.isVirtualColumn()) {
-        ExpressionEvaluator expressionEvaluator = 
ExpressionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
-        if (expressionEvaluator != null) {
-          sourceFieldNames.addAll(expressionEvaluator.getArguments());
-        } else {
-          sourceFieldNames.add(fieldSpec.getName());
-        }
-      }
-    }
-    return sourceFieldNames;
-  }
-
   /**
    * Validates that for a field spec with transform function, the source 
column name and destination column name are exclusive
    * i.e. do not allow using source column name for destination column
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
index 2c3b3f7..636262a 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.data.readers;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,9 +29,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -43,7 +44,7 @@ import org.testng.annotations.Test;
 public abstract class AbstractRecordExtractorTest {
 
   protected Schema _pinotSchema;
-  protected List<String> _sourceFieldNames;
+  protected Set<String> _sourceFieldNames;
   protected List<Map<String, Object>> _inputRecords;
   private RecordReader _recordReader;
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), 
"RecordTransformationTest");
@@ -53,7 +54,7 @@ public abstract class AbstractRecordExtractorTest {
       throws IOException {
     FileUtils.forceMkdir(_tempDir);
     _pinotSchema = getPinotSchema();
-    _sourceFieldNames = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSource(_pinotSchema));
+    _sourceFieldNames = getSourceFields();
     _inputRecords = getInputRecords();
     createInputFile();
     _recordReader = createRecordReader();
@@ -90,6 +91,10 @@ public abstract class AbstractRecordExtractorTest {
     return inputRecords;
   }
 
+  protected Set<String> getSourceFields() {
+    return Sets.newHashSet("user_id", "firstName", "lastName", "bids", 
"campaignInfo", "cost", "timestamp");
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index 085d609..7303a73 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pinot.spi.data.readers;
 
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -41,6 +43,7 @@ public abstract class AbstractRecordReaderTest {
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), 
"RecordReaderTest");
   protected List<Map<String, Object>> _records;
   protected org.apache.pinot.spi.data.Schema _pinotSchema;
+  protected Set<String> _sourceFields;
   private RecordReader _recordReader;
 
   private static List<Map<String, Object>> generateRandomRecords(Schema 
pinotSchema) {
@@ -123,12 +126,17 @@ public abstract class AbstractRecordReaderTest {
         .addMetric("met_double", FieldSpec.DataType.DOUBLE).build();
   }
 
+  protected Set<String> getSourceFields(Schema schema) {
+    return Sets.newHashSet(schema.getColumnNames());
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
     FileUtils.forceMkdir(_tempDir);
     // Generate Pinot schema
     _pinotSchema = getPinotSchema();
+    _sourceFields = getSourceFields(_pinotSchema);
     // Generate random records based on Pinot schema
     _records = generateRandomRecords(_pinotSchema);
     // Write generated random records to file
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
index 0dfd4ee..23659cf 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
@@ -73,7 +73,7 @@ public class PluginManagerTest {
         PluginManager.get().load("test-record-reader", jarDirFile);
 
         RecordReader testRecordReader = 
PluginManager.get().createInstance("test-record-reader", "TestRecordReader");
-        testRecordReader.init(null, null, null);
+        testRecordReader.init(null, null, null, null);
         int count = 0;
         while (testRecordReader.hasNext()) {
           GenericRow row = testRecordReader.next();
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
index c8cd285..7c1f038 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
@@ -28,7 +28,6 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -49,7 +48,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1, 
argument2)");
     schema.addField(dimensionFieldSpec);
 
-    List<String> extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+    List<String> extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 3);
     Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", 
"argument2")));
 
@@ -59,7 +58,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec.setTransformFunction("Groovy({function})");
     schema.addField(dimensionFieldSpec);
 
-    extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 1);
     Assert.assertTrue(extract.contains("d1"));
 
@@ -68,7 +67,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec = new DimensionFieldSpec("map__KEYS", 
FieldSpec.DataType.INT, false);
     schema.addField(dimensionFieldSpec);
 
-    extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("map", "map__KEYS")));
 
@@ -77,7 +76,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec = new DimensionFieldSpec("map__VALUES", 
FieldSpec.DataType.LONG, false);
     schema.addField(dimensionFieldSpec);
 
-    extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("map", 
"map__VALUES")));
 
@@ -87,7 +86,7 @@ public class SchemaFieldExtractorUtilsTest {
     TimeFieldSpec timeFieldSpec = new TimeFieldSpec("time", 
FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS);
     schema.addField(timeFieldSpec);
 
-    extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 1);
     Assert.assertTrue(extract.contains("time"));
 
@@ -98,7 +97,7 @@ public class SchemaFieldExtractorUtilsTest {
             TimeUnit.MILLISECONDS);
     schema.addField(timeFieldSpec);
 
-    extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("in", "out")));
   }
diff --git a/pinot-spi/src/test/resources/TestRecordReader.java 
b/pinot-spi/src/test/resources/TestRecordReader.java
index 8f212a1..a223d58 100644
--- a/pinot-spi/src/test/resources/TestRecordReader.java
+++ b/pinot-spi/src/test/resources/TestRecordReader.java
@@ -3,6 +3,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -21,7 +22,7 @@ public class TestRecordReader implements RecordReader {
   Iterator<GenericRow> _iterator;
   Schema _schema;
 
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig, Set<String> fields)
       throws IOException {
     _schema = schema;
     int numRows = 10;
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index 298387a..7b52630 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -26,7 +26,6 @@ import org.apache.pinot.tools.admin.command.AddTableCommand;
 import org.apache.pinot.tools.admin.command.AddTenantCommand;
 import org.apache.pinot.tools.admin.command.AnonymizeDataCommand;
 import org.apache.pinot.tools.admin.command.AvroSchemaToPinotSchema;
-import org.apache.pinot.tools.admin.command.BackfillDateTimeColumnCommand;
 import org.apache.pinot.tools.admin.command.ChangeNumReplicasCommand;
 import org.apache.pinot.tools.admin.command.ChangeTableState;
 import org.apache.pinot.tools.admin.command.CreateSegmentCommand;
@@ -119,7 +118,6 @@ public class PinotAdministrator {
       @SubCommand(name = "VerifySegmentState", impl = 
VerifySegmentState.class),
       @SubCommand(name = "ConvertPinotSegment", impl = 
PinotSegmentConvertCommand.class),
       @SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class),
-      @SubCommand(name = "BackfillSegmentColumn", impl = 
BackfillDateTimeColumnCommand.class),
       @SubCommand(name = "VerifyClusterState", impl = 
VerifyClusterStateCommand.class),
       @SubCommand(name = "RealtimeProvisioningHelper", impl = 
RealtimeProvisioningHelperCommand.class),
       @SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class),
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java
deleted file mode 100644
index bb0e3dc..0000000
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.tools.admin.command;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
-import org.apache.pinot.core.minion.BackfillDateTimeColumn;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.tools.Command;
-import org.apache.pinot.tools.backfill.BackfillSegmentUtils;
-import org.kohsuke.args4j.Option;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Class to download a segment, and backfill it with dateTimeFieldSpec 
corresponding to the timeFieldSpec
- *
- */
-public class BackfillDateTimeColumnCommand extends AbstractBaseAdminCommand 
implements Command {
-  private static final String OUTPUT_FOLDER = "output";
-  private static final String DOWNLOAD_FOLDER = "download";
-  private static final String BACKUP_FOLDER = "backup";
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(BackfillDateTimeColumnCommand.class);
-
-  @Option(name = "-controllerHost", required = true, metaVar = "<String>", 
usage = "host name for controller.")
-  private String _controllerHost;
-
-  @Option(name = "-controllerPort", required = true, metaVar = "<int>", usage 
= "Port number for controller.")
-  private String _controllerPort = DEFAULT_CONTROLLER_PORT;
-
-  @Option(name = "-tableName", required = true, metaVar = "<string>", usage = 
"Name of the table to backfill")
-  private String _tableName;
-
-  @Option(name = "-segmentNames", required = false, metaVar = "<string>", 
usage = "Comma separated names of the segments to backfill (if not specified, 
all segments will be backfilled)")
-  private String _segmentNames;
-
-  @Option(name = "-segmentType", required = false, metaVar = 
"<OFFLINE/REALTIME>", usage = "Type of segments to backfill (if not specified, 
all types will be backfilled)")
-  private SegmentType _segmentType;
-
-  @Option(name = "-srcTimeFieldSpec", required = true, metaVar = "<string>", 
usage = "File containing timeFieldSpec as json")
-  private String _srcTimeFieldSpec;
-
-  @Option(name = "-destDateTimeFieldSpec", required = true, metaVar = 
"<string>", usage = "File containing dateTimeFieldSpec as json")
-  private String _destDateTimeFieldSpec;
-
-  @Option(name = "-backupDir", required = true, metaVar = "<string>", usage = 
"Path to backup segments")
-  private String _backupDir;
-
-  @Option(name = "-help", required = false, help = true, aliases = {"-h", 
"--h", "--help"}, usage = "Print this message.")
-  private boolean _help = false;
-
-  public BackfillDateTimeColumnCommand setControllerHost(String 
controllerHost) {
-    _controllerHost = controllerHost;
-    return this;
-  }
-
-  public BackfillDateTimeColumnCommand setControllerPort(String 
controllerPort) {
-    _controllerPort = controllerPort;
-    return this;
-  }
-
-  public BackfillDateTimeColumnCommand setTableName(String tableName) {
-    _tableName = tableName;
-    return this;
-  }
-
-  public BackfillDateTimeColumnCommand setSegmentNames(String segmentNames) {
-    _segmentNames = segmentNames;
-    return this;
-  }
-
-  public BackfillDateTimeColumnCommand setSegmentType(SegmentType segmentType) 
{
-    _segmentType = segmentType;
-    return this;
-  }
-
-  public BackfillDateTimeColumnCommand setSrcTimeFieldSpec(String 
srcTimeFieldSpec) {
-    _srcTimeFieldSpec = srcTimeFieldSpec;
-    return this;
-  }
-
-  public BackfillDateTimeColumnCommand setDestDateTimeFieldSpec(String 
destDateTimeFieldSpec) {
-    _destDateTimeFieldSpec = destDateTimeFieldSpec;
-    return this;
-  }
-
-  public BackfillDateTimeColumnCommand setBackupDir(String backupDir) {
-    _backupDir = backupDir;
-    return this;
-  }
-
-  @Override
-  public String toString() {
-    return ("BackfillSegmentColumn  -controllerHost " + _controllerHost + " 
-controllerPort " + _controllerPort
-        + " -tableName " + _tableName + " -segmentNames " + _segmentNames + " 
-segmentType " + _segmentType
-        + " -srcTimeFieldSpec " + _srcTimeFieldSpec + " _destDateTimeFieldSpec 
" + _destDateTimeFieldSpec
-        + " -backupDir " + _backupDir);
-  }
-
-  @Override
-  public final String getName() {
-    return "BackfillSegmentColumn";
-  }
-
-  @Override
-  public String description() {
-    return "Backfill a column in segments of a pinot table, with a millis 
value corresponding to the time column";
-  }
-
-  @Override
-  public boolean getHelp() {
-    return _help;
-  }
-
-  @Override
-  public boolean execute()
-      throws Exception {
-    LOGGER.info("Executing command: {}", toString());
-
-    if (_controllerHost == null || _controllerPort == null) {
-      throw new RuntimeException("Must specify controller host and port.");
-    }
-
-    if (_backupDir == null) {
-      throw new RuntimeException("Must specify path to backup segments");
-    }
-
-    if (_srcTimeFieldSpec == null || _destDateTimeFieldSpec == null) {
-      throw new RuntimeException("Must specify srcTimeFieldSpec and 
destTimeFieldSpec.");
-    }
-    TimeFieldSpec timeFieldSpec = JsonUtils.fileToObject(new 
File(_srcTimeFieldSpec), TimeFieldSpec.class);
-    DateTimeFieldSpec dateTimeFieldSpec =
-        JsonUtils.fileToObject(new File(_destDateTimeFieldSpec), 
DateTimeFieldSpec.class);
-
-    if (_tableName == null) {
-      throw new RuntimeException("Must specify tableName.");
-    }
-
-    BackfillSegmentUtils backfillSegmentUtils = new 
BackfillSegmentUtils(_controllerHost, _controllerPort);
-
-    List<String> segmentNames = new ArrayList<>();
-    List<String> allSegmentNames = 
backfillSegmentUtils.getAllSegments(_tableName, _segmentType);
-    if (_segmentNames == null) {
-      segmentNames = allSegmentNames;
-    } else {
-      for (String segmentName : _segmentNames.split(",")) {
-        if (allSegmentNames.contains(segmentName)) {
-          segmentNames.add(segmentName);
-        } else {
-          throw new RuntimeException("Segment with name " + segmentName + " 
does not exist.");
-        }
-      }
-    }
-
-    File backupDir = new File(_backupDir, BACKUP_FOLDER);
-    File tableBackupDir = new File(backupDir, _tableName);
-    File downloadDir = new File(TMP_DIR, DOWNLOAD_FOLDER);
-    LOGGER.info("Backup dir {}", tableBackupDir);
-    LOGGER.info("DownloadDir {}", downloadDir);
-
-    for (String segmentName : segmentNames) {
-
-      LOGGER.info("\n\nSegment {}", segmentName);
-
-      // download segment
-      File downloadSegmentDir = new File(downloadDir, segmentName);
-      LOGGER.info("Downloading segment {} to {}", segmentName, 
downloadDir.getAbsolutePath());
-      boolean downloadStatus =
-          backfillSegmentUtils.downloadSegment(_tableName, segmentName, 
downloadSegmentDir, tableBackupDir);
-      LOGGER.info("Download status for segment {} is {}", segmentName, 
downloadStatus);
-      if (!downloadStatus) {
-        LOGGER.error("Failed to download segment {}. Skipping it.", 
segmentName);
-        continue;
-      }
-
-      // create new segment
-      String rawTableName = TableNameBuilder.extractRawTableName(_tableName);
-      File segmentDir = new File(downloadSegmentDir, segmentName);
-      File outputDir = new File(downloadSegmentDir, OUTPUT_FOLDER);
-      BackfillDateTimeColumn backfillDateTimeColumn =
-          new BackfillDateTimeColumn(rawTableName, segmentDir, outputDir, 
timeFieldSpec, dateTimeFieldSpec);
-      boolean backfillStatus = backfillDateTimeColumn.backfill();
-      LOGGER
-          .info("Backfill status for segment {} in {} to {} is {}", 
segmentName, segmentDir, outputDir, backfillStatus);
-
-      // upload segment
-      LOGGER.info("Uploading segment {} to host: {} port: {}", segmentName, 
_controllerHost, _controllerPort);
-      backfillSegmentUtils.uploadSegment(_tableName, segmentName, new 
File(outputDir, segmentName), outputDir);
-    }
-
-    // verify that all segments exist
-    List<String> missingSegments = new ArrayList<>();
-    allSegmentNames = backfillSegmentUtils.getAllSegments(_tableName, 
_segmentType);
-    for (String segmentName : segmentNames) {
-      if (!allSegmentNames.contains(segmentName)) {
-        missingSegments.add(segmentName);
-      }
-    }
-    if (missingSegments.size() != 0) {
-      LOGGER.error("Failed to backfill and upload segments {}", 
missingSegments);
-      return false;
-    }
-
-    LOGGER.info("Original segment backup is at {}", 
tableBackupDir.getAbsolutePath());
-    return true;
-  }
-}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 8424c82..680b808 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -364,7 +365,7 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
                   if (_readerConfigFile != null) {
                     readerConfig = JsonUtils.fileToObject(new 
File(_readerConfigFile), CSVRecordReaderConfig.class);
                   }
-                  csvRecordReader.init(localFile, schema, readerConfig);
+                  csvRecordReader.init(localFile, schema, readerConfig, 
SchemaFieldExtractorUtils.extractSourceFields(schema));
                   driver.init(config, csvRecordReader);
                   break;
                 default:
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index af80892..f78ed77 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.glassfish.tyrus.client.ClientManager;
 
@@ -67,7 +68,7 @@ public class MeetupRsvpStream {
     try {
       final ClientEndpointConfig cec = 
ClientEndpointConfig.Builder.create().build();
       final StreamMessageDecoder decoder = 
PluginManager.get().createInstance(KafkaStarterUtils.KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME);
-      decoder.init(null, schema, null);
+      decoder.init(null, schema, null, 
SchemaFieldExtractorUtils.extractSourceFields(schema));
       client = ClientManager.createClient();
       client.connectToServer(new Endpoint() {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to