This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9912b47  Refactor in preparation for the ORCRecordReader (#3900)
9912b47 is described below

commit 9912b47959c1a9c0915588b4ac60f8ca836a8e27
Author: Jennifer Dai <[email protected]>
AuthorDate: Thu Mar 7 14:31:38 2019 -0800

    Refactor in preparation for the ORCRecordReader (#3900)
    
    * Refactoring record reader to allow for customized RecordReader 
configuration during segment creation.
---
 .../pinot/core/data/readers/AvroRecordReader.java   |  6 ++++++
 .../pinot/core/data/readers/CSVRecordReader.java    |  6 ++++++
 .../apache/pinot/core/data/readers/FileFormat.java  |  2 +-
 .../core/data/readers/GenericRowRecordReader.java   |  6 ++++++
 .../pinot/core/data/readers/JSONRecordReader.java   |  7 +++++++
 .../readers/MultiplePinotSegmentRecordReader.java   |  6 ++++++
 .../core/data/readers/PinotSegmentRecordReader.java |  6 ++++++
 .../pinot/core/data/readers/RecordReader.java       |  6 ++++++
 .../core/data/readers/RecordReaderFactory.java      | 21 +++++++++++++++++++++
 .../pinot/core/data/readers/ThriftRecordReader.java |  6 ++++++
 .../generator/SegmentGeneratorConfig.java           | 10 ++++++++++
 .../pinot/core/minion/BackfillDateTimeColumn.java   |  5 +++++
 .../org/apache/pinot/core/minion/SegmentPurger.java |  5 +++++
 .../core/minion/segment/MapperRecordReader.java     |  6 ++++++
 .../core/minion/segment/ReducerRecordReader.java    |  6 ++++++
 .../converter/RealtimeSegmentRecordReader.java      |  6 ++++++
 .../apache/pinot/hadoop/job/JobConfigConstants.java |  3 +++
 .../hadoop/job/mapper/SegmentCreationMapper.java    | 14 +++++++++++---
 18 files changed, 123 insertions(+), 4 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
index d95034b..c5ca4cc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.util.AvroUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +61,11 @@ public class AvroRecordReader implements RecordReader {
     }
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   private void validateSchema() {
     org.apache.avro.Schema avroSchema = _avroReader.getSchema();
     for (FieldSpec fieldSpec : _fieldSpecs) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
index 6d00351..00cce5d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -92,6 +93,11 @@ public class CSVRecordReader implements RecordReader {
     init();
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   private void init()
       throws IOException {
     _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
index 0826cd5..5f7120d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.core.data.readers;
 
 public enum FileFormat {
-  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT
+  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
index 8fb1bbd..5615f3f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -41,6 +42,11 @@ public class GenericRowRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     return _nextRowId < _numRows;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
index bbe786a..23ec05b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -43,6 +44,7 @@ public class JSONRecordReader implements RecordReader {
   private JsonParser _parser;
   private Iterator<Map> _iterator;
 
+
   public JSONRecordReader(File dataFile, Schema schema)
       throws IOException {
     _dataFile = dataFile;
@@ -64,6 +66,11 @@ public class JSONRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     return _iterator.hasNext();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 9489719..6448a63 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -53,6 +54,11 @@ public class MultiplePinotSegmentRecordReader implements 
RecordReader {
     this(indexDirs, null, null);
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   /**
    * Read records using the passed in schema and in the order of sorted column 
from multiple pinot segments.
    * <p>Passed in schema must be a subset of the segment schema.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index bd3fd6f..50c1808 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.sort.PinotSegmentSorter;
 import org.apache.pinot.core.data.readers.sort.SegmentSorter;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -59,6 +60,11 @@ public class PinotSegmentRecordReader implements 
RecordReader {
     this(indexDir, null, null);
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   /**
    * Read records using the segment schema with the given schema and sort order
    * <p>Passed in schema must be a subset of the segment schema.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
index bb57fe7..338b91b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -33,6 +34,11 @@ import org.apache.pinot.core.data.GenericRow;
 public interface RecordReader extends Closeable {
 
   /**
+   * Initializes the record reader when needed
+   */
+  void init(SegmentGeneratorConfig segmentGeneratorConfig);
+
+  /**
    * Return <code>true</code> if more records remain to be read.
    */
   boolean hasNext();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
index 7867005..7909905 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
@@ -22,9 +22,14 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class RecordReaderFactory {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RecordReaderFactory.class);
+
   private RecordReaderFactory() {
   }
 
@@ -35,6 +40,22 @@ public class RecordReaderFactory {
 
     Schema schema = segmentGeneratorConfig.getSchema();
     FileFormat fileFormat = segmentGeneratorConfig.getFormat();
+    String recordReaderPath = segmentGeneratorConfig.getRecordReaderPath();
+
+    // Allow for instantiation general record readers from a record reader 
path passed into segment generator config
+    // If this is set, this will override the file format
+    if (recordReaderPath != null) {
+      if (fileFormat != FileFormat.OTHER) {
+        // We currently have default file format set to AVRO inside segment 
generator config,
+        // do not want to break this behavior for clients.
+        LOGGER.warn("Using recordReaderPath {} to read segment, ignoring 
fileformat {}",
+            recordReaderPath, fileFormat.toString());
+      }
+      RecordReader recordReader = (RecordReader) 
Class.forName(recordReaderPath).newInstance();
+      recordReader.init(segmentGeneratorConfig);
+      return recordReader;
+    }
+
     switch (fileFormat) {
       case AVRO:
       case GZIPPED_AVRO:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
index b19ca2a..1f24010 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TFieldIdEnum;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -77,6 +78,11 @@ public class ThriftRecordReader implements RecordReader {
     }
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   private boolean hasMoreToRead()
       throws IOException {
     _inputStream.mark(1);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 6a5020b..bc176bd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -83,6 +83,7 @@ public class SegmentGeneratorConfig {
   private String _dataDir = null;
   private String _inputFilePath = null;
   private FileFormat _format = FileFormat.AVRO;
+  private String _recordReaderPath = null;
   private String _outDir = null;
   private boolean _overwrite = false;
   private String _tableName = null;
@@ -156,6 +157,7 @@ public class SegmentGeneratorConfig {
     _timeColumnType = config._timeColumnType;
     _simpleDateFormat = config._simpleDateFormat;
     _onHeap = config._onHeap;
+    _recordReaderPath = config._recordReaderPath;
   }
 
   /**
@@ -323,6 +325,14 @@ public class SegmentGeneratorConfig {
     _format = format;
   }
 
+  public String getRecordReaderPath() {
+    return _recordReaderPath;
+  }
+
+  public void setRecordReaderPath(String recordReaderPath) {
+    _recordReaderPath = recordReaderPath;
+  }
+
   public String getOutDir() {
     return _outDir;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
index ead3305..9d786b0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
@@ -136,6 +136,11 @@ public class BackfillDateTimeColumn {
     }
 
     @Override
+    public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+    }
+
+    @Override
     public boolean hasNext() {
       return _baseRecordReader.hasNext();
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 90e32d3..9581310 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -151,6 +151,11 @@ public class SegmentPurger {
     }
 
     @Override
+    public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+    }
+
+    @Override
     public boolean hasNext() {
       if (_recordPurger == null) {
         return _recordReader.hasNext();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
index 609a1f2..42abbd3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.MultiplePinotSegmentRecordReader;
 import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -54,6 +55,11 @@ public class MapperRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     if (_finished) {
       return false;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
index 2fbccb6..80e6507 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.data.readers.RecordReader;
 import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 
 
 /**
@@ -51,6 +52,11 @@ public class ReducerRecordReader implements RecordReader {
   }
 
   @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
+  @Override
   public boolean hasNext() {
     if (_finished) {
       return false;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index dc35c1d..235ed00 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.realtime.converter;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 
 
@@ -49,6 +50,11 @@ public class RealtimeSegmentRecordReader implements 
RecordReader {
     _sortedDocIdIterationOrder = 
realtimeSegment.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn);
   }
 
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+  }
+
   public int[] getSortedDocIdIterationOrder() {
     return _sortedDocIdIterationOrder;
   }
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 1a50c5c..35557b5 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -49,4 +49,7 @@ public class JobConfigConstants {
   public static final String PUSH_TO_PORT = "push.to.port";
 
   public static final String DEFAULT_PERMISSIONS_MASK = 
"fs.permissions.umask-mode";
+
+  // The path to the record reader to be configured
+  public static final String RECORD_READER_PATH = "record.reader.path";
 }
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index 705e1ad..21f3f16 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -69,6 +69,7 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
 
   // Optional
   protected TableConfig _tableConfig;
+  protected String _recordReaderPath;
   protected Path _readerConfigFile;
 
   // HDFS segment tar directory
@@ -101,6 +102,8 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
       _readerConfigFile = new Path(readerConfigFile);
     }
 
+    _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH, 
null);
+
     // Set up segment name generator
     String segmentNameGeneratorType =
         _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, 
JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
@@ -204,9 +207,14 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
     segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
     segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
     segmentGeneratorConfig.setSequenceId(sequenceId);
-    FileFormat fileFormat = getFileFormat(inputFileName);
-    segmentGeneratorConfig.setFormat(fileFormat);
-    segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+    segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath);
+    if (_recordReaderPath != null) {
+      segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+    } else {
+      FileFormat fileFormat = getFileFormat(inputFileName);
+      segmentGeneratorConfig.setFormat(fileFormat);
+      segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+    }
     segmentGeneratorConfig.setOnHeap(true);
 
     addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, 
hdfsInputFile, sequenceId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to