This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch no_direct_record_reader_constructor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 5823528605871d774951cb38d030d4d8bc8eb293
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Dec 4 13:19:29 2019 -0800

    Move all file format based record reader to be constructed using class name 
and init method
---
 .../core/data/readers/RecordReaderFactory.java     | 58 ++++++++++++++++------
 .../data/readers/RecordReaderSampleDataTest.java   | 54 ++++++++++++--------
 .../MutableSegmentImplNullValueVectorTest.java     | 27 +++++-----
 .../mutable/MutableSegmentImplTest.java            | 18 +++----
 .../pinot/avro/data/readers/AvroRecordReader.java  | 16 +++---
 .../avro/data/readers/AvroRecordReaderTest.java    |  4 +-
 .../CSVRecordReader.java                           | 23 ++++-----
 .../CSVRecordReaderTest.java                       |  4 +-
 .../JSONRecordReader.java                          | 21 ++++----
 .../JSONRecordReaderTest.java                      |  4 +-
 .../ThriftRecordReader.java                        | 48 ++++++++++--------
 .../ThriftRecordReaderTest.java                    | 11 ++--
 12 files changed, 168 insertions(+), 120 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
index 5649151..5492eca 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
@@ -20,15 +20,12 @@ package org.apache.pinot.core.data.readers;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
-import org.apache.pinot.avro.data.readers.AvroRecordReader;
-import org.apache.pinot.csv.data.readers.CSVRecordReader;
-import org.apache.pinot.csv.data.readers.CSVRecordReaderConfig;
-import org.apache.pinot.json.data.readers.JSONRecordReader;
-import org.apache.pinot.spi.data.Schema;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.thrift.data.readers.ThriftRecordReader;
-import org.apache.pinot.thrift.data.readers.ThriftRecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,10 +33,47 @@ import org.slf4j.LoggerFactory;
 public class RecordReaderFactory {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RecordReaderFactory.class);
+  private static final Map<FileFormat, String> DEFAULT_RECORD_READER_CLASS_MAP 
= new HashMap<>();
+
+  private static final String DEFAULT_AVRO_RECORD_READER_CLASS = 
"org.apache.pinot.avro.data.readers.AvroRecordReader";
+  private static final String DEFAULT_CSV_RECORD_READER_CLASS = 
"org.apache.pinot.csv.data.readers.CSVRecordReader";
+  private static final String DEFAULT_JSON_RECORD_READER_CLASS = 
"org.apache.pinot.json.data.readers.JSONRecordReader";
+  private static final String DEFAULT_THRIFT_RECORD_READER_CLASS =
+      "org.apache.pinot.thrift.data.readers.ThriftRecordReader";
+  private static final String DEFAULT_ORC_RECORD_READER_CLASS = 
"org.apache.pinot.orc.data.readers.ORCRecordReader";
+  private static final String DEFAULT_PARQUET_RECORD_READER_CLASS =
+      "org.apache.pinot.parquet.data.readers.ParquetRecordReader";
+
+  static {
+    DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.AVRO, 
DEFAULT_AVRO_RECORD_READER_CLASS);
+    DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.GZIPPED_AVRO, 
DEFAULT_AVRO_RECORD_READER_CLASS);
+    DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.CSV, 
DEFAULT_CSV_RECORD_READER_CLASS);
+    DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.JSON, 
DEFAULT_JSON_RECORD_READER_CLASS);
+    DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.THRIFT, 
DEFAULT_THRIFT_RECORD_READER_CLASS);
+    DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.ORC, 
DEFAULT_ORC_RECORD_READER_CLASS);
+    DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.PARQUET, 
DEFAULT_PARQUET_RECORD_READER_CLASS);
+  }
 
   private RecordReaderFactory() {
   }
 
+  public static RecordReader getRecordReader(String recordReaderClassName, 
File dataFile, Schema schema,
+      RecordReaderConfig config)
+      throws Exception {
+    RecordReader recordReader = (RecordReader) 
Class.forName(recordReaderClassName).newInstance();
+    recordReader.init(dataFile, schema, config);
+    return recordReader;
+  }
+
+  public static RecordReader getRecordReader(FileFormat fileFormat, File 
dataFile, Schema schema,
+      RecordReaderConfig config)
+      throws Exception {
+    if (DEFAULT_RECORD_READER_CLASS_MAP.containsKey(fileFormat)) {
+      return getRecordReader(DEFAULT_RECORD_READER_CLASS_MAP.get(fileFormat), 
dataFile, schema, config);
+    }
+    throw new UnsupportedOperationException("No supported RecordReader found 
for file format - '" + fileFormat + "'");
+  }
+
   public static RecordReader getRecordReader(SegmentGeneratorConfig 
segmentGeneratorConfig)
       throws Exception {
     File dataFile = new File(segmentGeneratorConfig.getInputFilePath());
@@ -58,22 +92,16 @@ public class RecordReaderFactory {
         LOGGER
             .warn("Using class: {} to read segment, ignoring configured file 
format: {}", recordReaderPath, fileFormat);
       }
-      RecordReader recordReader = (RecordReader) 
Class.forName(recordReaderPath).newInstance();
-      recordReader.init(dataFile, schema, 
segmentGeneratorConfig.getReaderConfig());
-      return recordReader;
+      return getRecordReader(recordReaderPath, dataFile, schema, 
segmentGeneratorConfig.getReaderConfig());
     }
 
     switch (fileFormat) {
       case AVRO:
       case GZIPPED_AVRO:
-        return new AvroRecordReader(dataFile, schema);
       case CSV:
-        return new CSVRecordReader(dataFile, schema, (CSVRecordReaderConfig) 
segmentGeneratorConfig.getReaderConfig());
       case JSON:
-        return new JSONRecordReader(dataFile, schema);
       case THRIFT:
-        return new ThriftRecordReader(dataFile, schema,
-            (ThriftRecordReaderConfig) 
segmentGeneratorConfig.getReaderConfig());
+        return getRecordReader(fileFormat, dataFile, schema, 
segmentGeneratorConfig.getReaderConfig());
       // NOTE: PinotSegmentRecordReader does not support time conversion 
(field spec must match)
       case PINOT:
         return new PinotSegmentRecordReader(dataFile, schema, 
segmentGeneratorConfig.getColumnSortOrder());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
index 7f73865..a7404c0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
@@ -21,13 +21,11 @@ package org.apache.pinot.core.data.readers;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.avro.data.readers.AvroRecordReader;
-import org.apache.pinot.csv.data.readers.CSVRecordReader;
-import org.apache.pinot.json.data.readers.JSONRecordReader;
+import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -73,9 +71,12 @@ public class RecordReaderSampleDataTest {
   public void testRecordReaders()
       throws Exception {
     CompositeTransformer defaultTransformer = 
CompositeTransformer.getDefaultTransformer(SCHEMA);
-    try (AvroRecordReader avroRecordReader = new 
AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA);
-        CSVRecordReader csvRecordReader = new 
CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA, null);
-        JSONRecordReader jsonRecordReader = new 
JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA)) {
+    try (RecordReader avroRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA, null);
+        RecordReader csvRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA, 
null);
+        RecordReader jsonRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA, 
null)) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
@@ -115,10 +116,12 @@ public class RecordReaderSampleDataTest {
   @Test
   public void testSameIncomingOutgoing()
       throws Exception {
-    try (AvroRecordReader avroRecordReader = new 
AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA_SAME_INCOMING_OUTGOING);
-        CSVRecordReader csvRecordReader = new 
CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_SAME_INCOMING_OUTGOING,
-            null); JSONRecordReader jsonRecordReader = new 
JSONRecordReader(JSON_SAMPLE_DATA_FILE,
-        SCHEMA_SAME_INCOMING_OUTGOING)) {
+    try (RecordReader avroRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_SAME_INCOMING_OUTGOING, null);
+        RecordReader csvRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_SAME_INCOMING_OUTGOING, null);
+        RecordReader jsonRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_SAME_INCOMING_OUTGOING, null)) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
@@ -144,11 +147,12 @@ public class RecordReaderSampleDataTest {
   @Test
   public void testDifferentIncomingOutgoing()
       throws Exception {
-    try (AvroRecordReader avroRecordReader = new 
AvroRecordReader(AVRO_SAMPLE_DATA_FILE,
-        SCHEMA_DIFFERENT_INCOMING_OUTGOING);
-        CSVRecordReader csvRecordReader = new 
CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_DIFFERENT_INCOMING_OUTGOING,
-            null); JSONRecordReader jsonRecordReader = new 
JSONRecordReader(JSON_SAMPLE_DATA_FILE,
-        SCHEMA_DIFFERENT_INCOMING_OUTGOING)) {
+    try (RecordReader avroRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null);
+        RecordReader csvRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null);
+        RecordReader jsonRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null)) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
@@ -177,9 +181,12 @@ public class RecordReaderSampleDataTest {
   @Test
   public void testNoIncoming()
       throws Exception {
-    try (AvroRecordReader avroRecordReader = new 
AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING);
-        CSVRecordReader csvRecordReader = new 
CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING, null);
-        JSONRecordReader jsonRecordReader = new 
JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING)) {
+    try (RecordReader avroRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null);
+        RecordReader csvRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null);
+        RecordReader jsonRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_NO_INCOMING, null)) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
@@ -208,9 +215,12 @@ public class RecordReaderSampleDataTest {
   @Test
   public void testNoOutgoing()
       throws Exception {
-    try (AvroRecordReader avroRecordReader = new 
AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING);
-        CSVRecordReader csvRecordReader = new 
CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING, null);
-        JSONRecordReader jsonRecordReader = new 
JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING)) {
+    try (RecordReader avroRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null);
+        RecordReader csvRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null);
+        RecordReader jsonRecordReader = RecordReaderFactory
+            .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, 
SCHEMA_NO_OUTGOING, null)) {
       int numRecords = 0;
       while (avroRecordReader.hasNext()) {
         assertTrue(csvRecordReader.hasNext());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
index dccfc17..3ac25f9 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
@@ -18,24 +18,23 @@
  */
 package org.apache.pinot.core.indexsegment.mutable;
 
-import org.apache.pinot.json.data.readers.JSONRecordReader;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderFactory;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
 import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
 
 public class MutableSegmentImplNullValueVectorTest {
   private static final String PINOT_SCHEMA_FILE_PATH = 
"data/test_null_value_vector_pinot_schema.json";
@@ -47,17 +46,17 @@ public class MutableSegmentImplNullValueVectorTest {
 
   @BeforeClass
   public void setup()
-      throws IOException {
+      throws Exception {
     URL schemaResourceUrl = 
this.getClass().getClassLoader().getResource(PINOT_SCHEMA_FILE_PATH);
     URL dataResourceUrl = 
this.getClass().getClassLoader().getResource(DATA_FILE);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_schema);
-    File avroFile = new File(dataResourceUrl.getFile());
+    File jsonFile = new File(dataResourceUrl.getFile());
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
             false, true);
     GenericRow reuse = new GenericRow();
-    try (RecordReader recordReader = new JSONRecordReader(avroFile, _schema)) {
+    try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile, _schema, null)) {
       while (recordReader.hasNext()) {
         recordReader.next(reuse);
         GenericRow transformedRow = _recordTransformer.transform(reuse);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index f1b6b38..a8d8ac0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -22,18 +22,14 @@ import java.io.File;
 import java.net.URL;
 import java.util.Collections;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.avro.data.readers.AvroRecordReader;
-import org.apache.pinot.avro.data.readers.AvroUtils;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.core.common.BlockMultiValIterator;
 import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.common.DataSourceMetadata;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReaderFactory;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
@@ -42,6 +38,10 @@ import 
org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -77,13 +77,13 @@ public class MutableSegmentImplTest {
 
     _schema = config.getSchema();
     _mutableSegmentImpl = MutableSegmentImplTestUtils
-        .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(),
-            Collections.emptySet(),false);
+        .createMutableSegmentImpl(_schema, Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet(),
+            false);
     _lastIngestionTimeMs = System.currentTimeMillis();
     StreamMessageMetadata defaultMetadata = new 
StreamMessageMetadata(_lastIngestionTimeMs);
     _startTimeMs = System.currentTimeMillis();
 
-    try (RecordReader recordReader = new AvroRecordReader(avroFile, _schema)) {
+    try (RecordReader recordReader = 
RecordReaderFactory.getRecordReader(FileFormat.AVRO, avroFile, _schema, null)) {
       GenericRow reuse = new GenericRow();
       while (recordReader.hasNext()) {
         _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
diff --git 
a/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java
 
b/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java
index 5759f90..977014a 100644
--- 
a/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java
+++ 
b/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java
@@ -36,14 +36,18 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  * Record reader for AVRO file.
  */
 public class AvroRecordReader implements RecordReader {
-  private final File _dataFile;
-  private final Schema _schema;
-  private final List<FieldSpec> _fieldSpecs;
+  private File _dataFile;
+  private Schema _schema;
+  private List<FieldSpec> _fieldSpecs;
 
   private DataFileStream<GenericRecord> _avroReader;
   private GenericRecord _reusableAvroRecord = null;
 
-  public AvroRecordReader(File dataFile, Schema schema)
+  public AvroRecordReader() {
+  }
+
+  @Override
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
       throws IOException {
     _dataFile = dataFile;
     _schema = schema;
@@ -58,10 +62,6 @@ public class AvroRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
-  }
-
-  @Override
   public boolean hasNext() {
     return _avroReader.hasNext();
   }
diff --git 
a/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java
 
b/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java
index 9d4e534..ea889fe 100644
--- 
a/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java
+++ 
b/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java
@@ -38,7 +38,9 @@ public class AvroRecordReaderTest extends 
AbstractRecordReaderTest {
   @Override
   protected RecordReader createRecordReader()
       throws Exception {
-    return new AvroRecordReader(_dataFile, getPinotSchema());
+    AvroRecordReader avroRecordReader = new AvroRecordReader();
+    avroRecordReader.init(_dataFile, getPinotSchema(), null);
+    return avroRecordReader;
   }
 
   @Override
diff --git 
a/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java
 
b/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java
index 2a066a9..df4d2b4 100644
--- 
a/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java
+++ 
b/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java
@@ -39,21 +39,25 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  * Record reader for CSV file.
  */
 public class CSVRecordReader implements RecordReader {
-  private final File _dataFile;
-  private final Schema _schema;
-  private final List<FieldSpec> _fieldSpecs;
-  private final CSVFormat _format;
-  private final char _multiValueDelimiter;
+  private File _dataFile;
+  private Schema _schema;
+  private List<FieldSpec> _fieldSpecs;
+  private CSVFormat _format;
+  private char _multiValueDelimiter;
 
   private CSVParser _parser;
   private Iterator<CSVRecord> _iterator;
 
-  public CSVRecordReader(File dataFile, Schema schema, CSVRecordReaderConfig 
config)
+  public CSVRecordReader() {
+  }
+
+  @Override
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
       throws IOException {
     _dataFile = dataFile;
     _schema = schema;
     _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
-
+    CSVRecordReaderConfig config = (CSVRecordReaderConfig) recordReaderConfig;
     if (config == null) {
       _format = 
CSVFormat.DEFAULT.withDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).withHeader();
       _multiValueDelimiter = 
CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER;
@@ -92,7 +96,6 @@ public class CSVRecordReader implements RecordReader {
       _format = format;
       _multiValueDelimiter = config.getMultiValueDelimiter();
     }
-
     init();
   }
 
@@ -103,10 +106,6 @@ public class CSVRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
-  }
-
-  @Override
   public boolean hasNext() {
     return _iterator.hasNext();
   }
diff --git 
a/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java
 
b/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java
index ad22639..d5524da 100644
--- 
a/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java
+++ 
b/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java
@@ -39,7 +39,9 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
       throws Exception {
     CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
     csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
-    return new CSVRecordReader(_dataFile, getPinotSchema(), 
csvRecordReaderConfig);
+    CSVRecordReader csvRecordReader = new CSVRecordReader();
+    csvRecordReader.init(_dataFile, getPinotSchema(), csvRecordReaderConfig);
+    return csvRecordReader;
   }
 
   @Override
diff --git 
a/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java
 
b/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java
index ca6b63f..3d01f1d 100644
--- 
a/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java
+++ 
b/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java
@@ -38,19 +38,13 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * Record reader for JSON file.
  */
 public class JSONRecordReader implements RecordReader {
-  private final File _dataFile;
-  private final Schema _schema;
-  private final List<FieldSpec> _fieldSpecs;
+  private File _dataFile;
+  private Schema _schema;
+  private List<FieldSpec> _fieldSpecs;
 
   private MappingIterator<Map<String, Object>> _iterator;
 
-  public JSONRecordReader(File dataFile, Schema schema)
-      throws IOException {
-    _dataFile = dataFile;
-    _schema = schema;
-    _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
-
-    init();
+  public JSONRecordReader() {
   }
 
   private void init()
@@ -65,7 +59,12 @@ public class JSONRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+      throws IOException {
+    _dataFile = dataFile;
+    _schema = schema;
+    _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
+    init();
   }
 
   @Override
diff --git 
a/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java
 
b/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java
index d2eabae..df40c77 100644
--- 
a/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java
+++ 
b/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java
@@ -34,7 +34,9 @@ public class JSONRecordReaderTest extends 
AbstractRecordReaderTest {
   @Override
   protected RecordReader createRecordReader()
       throws Exception {
-    return new JSONRecordReader(_dateFile, getPinotSchema());
+    JSONRecordReader recordReader = new JSONRecordReader();
+    recordReader.init(_dateFile, getPinotSchema(), null);
+    return recordReader;
   }
 
   @Override
diff --git 
a/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java
 
b/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java
index 5a00aa0..fd16e72 100644
--- 
a/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java
+++ 
b/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java
@@ -42,31 +42,17 @@ import org.apache.thrift.transport.TIOStreamTransport;
  * Record reader for Thrift file.
  */
 public class ThriftRecordReader implements RecordReader {
-  private final File _dataFile;
-  private final Schema _schema;
-  private final List<FieldSpec> _fieldSpecs;
-  private final Class<?> _thriftClass;
-  private final Map<String, Integer> _fieldIds = new HashMap<>();
+  private File _dataFile;
+  private Schema _schema;
+  private List<FieldSpec> _fieldSpecs;
+  private Class<?> _thriftClass;
+  private Map<String, Integer> _fieldIds = new HashMap<>();
 
   private InputStream _inputStream;
   private TProtocol _tProtocol;
   private boolean _hasNext;
 
-  public ThriftRecordReader(File dataFile, Schema schema, 
ThriftRecordReaderConfig recordReaderConfig)
-      throws IOException, ClassNotFoundException, IllegalAccessException, 
InstantiationException {
-    _dataFile = dataFile;
-    _schema = schema;
-    _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
-    _thriftClass = Class.forName(recordReaderConfig.getThriftClass());
-    TBase tObject = (TBase) _thriftClass.newInstance();
-    int index = 1;
-    TFieldIdEnum tFieldIdEnum;
-    while ((tFieldIdEnum = tObject.fieldForId(index)) != null) {
-      _fieldIds.put(tFieldIdEnum.getFieldName(), index);
-      index++;
-    }
-
-    init();
+  public ThriftRecordReader() {
   }
 
   private void init()
@@ -90,7 +76,27 @@ public class ThriftRecordReader implements RecordReader {
   }
 
   @Override
-  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig) {
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
config)
+      throws IOException {
+    ThriftRecordReaderConfig recordReaderConfig = (ThriftRecordReaderConfig) 
config;
+    _dataFile = dataFile;
+    _schema = schema;
+    _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema);
+    TBase tObject;
+    try {
+      _thriftClass = Class.forName(recordReaderConfig.getThriftClass());
+      tObject = (TBase) _thriftClass.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    int index = 1;
+    TFieldIdEnum tFieldIdEnum;
+    while ((tFieldIdEnum = tObject.fieldForId(index)) != null) {
+      _fieldIds.put(tFieldIdEnum.getFieldName(), index);
+      index++;
+    }
+
+    init();
   }
 
   @Override
diff --git 
a/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java
 
b/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java
index 54c1446..465d9cf 100644
--- 
a/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java
+++ 
b/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java
@@ -93,9 +93,9 @@ public class ThriftRecordReaderTest {
 
   @Test
   public void testReadData()
-      throws IOException, ClassNotFoundException, InstantiationException, 
IllegalAccessException {
-    ThriftRecordReader recordReader = new ThriftRecordReader(_tempFile, 
getSchema(), getThriftRecordReaderConfig());
-
+      throws IOException {
+    ThriftRecordReader recordReader = new ThriftRecordReader();
+    recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig());
     List<GenericRow> genericRows = new ArrayList<>();
     while (recordReader.hasNext()) {
       genericRows.add(recordReader.next());
@@ -112,8 +112,9 @@ public class ThriftRecordReaderTest {
 
   @Test
   public void testRewind()
-      throws ClassNotFoundException, InstantiationException, 
IllegalAccessException, IOException {
-    ThriftRecordReader recordReader = new ThriftRecordReader(_tempFile, 
getSchema(), getThriftRecordReaderConfig());
+      throws IOException {
+    ThriftRecordReader recordReader = new ThriftRecordReader();
+    recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig());
     List<GenericRow> genericRows = new ArrayList<>();
     while (recordReader.hasNext()) {
       genericRows.add(recordReader.next());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to