This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch no_direct_record_reader_constructor in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5823528605871d774951cb38d030d4d8bc8eb293 Author: Xiang Fu <[email protected]> AuthorDate: Wed Dec 4 13:19:29 2019 -0800 Move all file format based record reader to be constructed using class name and init method --- .../core/data/readers/RecordReaderFactory.java | 58 ++++++++++++++++------ .../data/readers/RecordReaderSampleDataTest.java | 54 ++++++++++++-------- .../MutableSegmentImplNullValueVectorTest.java | 27 +++++----- .../mutable/MutableSegmentImplTest.java | 18 +++---- .../pinot/avro/data/readers/AvroRecordReader.java | 16 +++--- .../avro/data/readers/AvroRecordReaderTest.java | 4 +- .../CSVRecordReader.java | 23 ++++----- .../CSVRecordReaderTest.java | 4 +- .../JSONRecordReader.java | 21 ++++---- .../JSONRecordReaderTest.java | 4 +- .../ThriftRecordReader.java | 48 ++++++++++-------- .../ThriftRecordReaderTest.java | 11 ++-- 12 files changed, 168 insertions(+), 120 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java index 5649151..5492eca 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java @@ -20,15 +20,12 @@ package org.apache.pinot.core.data.readers; import com.google.common.base.Preconditions; import java.io.File; -import org.apache.pinot.avro.data.readers.AvroRecordReader; -import org.apache.pinot.csv.data.readers.CSVRecordReader; -import org.apache.pinot.csv.data.readers.CSVRecordReaderConfig; -import org.apache.pinot.json.data.readers.JSONRecordReader; -import org.apache.pinot.spi.data.Schema; +import java.util.HashMap; +import java.util.Map; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.thrift.data.readers.ThriftRecordReader; -import org.apache.pinot.thrift.data.readers.ThriftRecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +33,47 @@ import org.slf4j.LoggerFactory; public class RecordReaderFactory { private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderFactory.class); + private static final Map<FileFormat, String> DEFAULT_RECORD_READER_CLASS_MAP = new HashMap<>(); + + private static final String DEFAULT_AVRO_RECORD_READER_CLASS = "org.apache.pinot.avro.data.readers.AvroRecordReader"; + private static final String DEFAULT_CSV_RECORD_READER_CLASS = "org.apache.pinot.csv.data.readers.CSVRecordReader"; + private static final String DEFAULT_JSON_RECORD_READER_CLASS = "org.apache.pinot.json.data.readers.JSONRecordReader"; + private static final String DEFAULT_THRIFT_RECORD_READER_CLASS = + "org.apache.pinot.thrift.data.readers.ThriftRecordReader"; + private static final String DEFAULT_ORC_RECORD_READER_CLASS = "org.apache.pinot.orc.data.readers.ORCRecordReader"; + private static final String DEFAULT_PARQUET_RECORD_READER_CLASS = + "org.apache.pinot.parquet.data.readers.ParquetRecordReader"; + + static { + DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.AVRO, DEFAULT_AVRO_RECORD_READER_CLASS); + DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.GZIPPED_AVRO, DEFAULT_AVRO_RECORD_READER_CLASS); + DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.CSV, DEFAULT_CSV_RECORD_READER_CLASS); + DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.JSON, DEFAULT_JSON_RECORD_READER_CLASS); + DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.THRIFT, DEFAULT_THRIFT_RECORD_READER_CLASS); + DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.ORC, DEFAULT_ORC_RECORD_READER_CLASS); + DEFAULT_RECORD_READER_CLASS_MAP.put(FileFormat.PARQUET, DEFAULT_PARQUET_RECORD_READER_CLASS); + } private RecordReaderFactory() { } + public static RecordReader getRecordReader(String recordReaderClassName, File dataFile, Schema schema, + RecordReaderConfig config) + throws Exception { + RecordReader recordReader = (RecordReader) Class.forName(recordReaderClassName).newInstance(); + recordReader.init(dataFile, schema, config); + return recordReader; + } + + public static RecordReader getRecordReader(FileFormat fileFormat, File dataFile, Schema schema, + RecordReaderConfig config) + throws Exception { + if (DEFAULT_RECORD_READER_CLASS_MAP.containsKey(fileFormat)) { + return getRecordReader(DEFAULT_RECORD_READER_CLASS_MAP.get(fileFormat), dataFile, schema, config); + } + throw new UnsupportedOperationException("No supported RecordReader found for file format - '" + fileFormat + "'"); + } + public static RecordReader getRecordReader(SegmentGeneratorConfig segmentGeneratorConfig) throws Exception { File dataFile = new File(segmentGeneratorConfig.getInputFilePath()); @@ -58,22 +92,16 @@ public class RecordReaderFactory { LOGGER .warn("Using class: {} to read segment, ignoring configured file format: {}", recordReaderPath, fileFormat); } - RecordReader recordReader = (RecordReader) Class.forName(recordReaderPath).newInstance(); - recordReader.init(dataFile, schema, segmentGeneratorConfig.getReaderConfig()); - return recordReader; + return getRecordReader(recordReaderPath, dataFile, schema, segmentGeneratorConfig.getReaderConfig()); } switch (fileFormat) { case AVRO: case GZIPPED_AVRO: - return new AvroRecordReader(dataFile, schema); case CSV: - return new CSVRecordReader(dataFile, schema, (CSVRecordReaderConfig) segmentGeneratorConfig.getReaderConfig()); case JSON: - return new JSONRecordReader(dataFile, schema); case THRIFT: - return new ThriftRecordReader(dataFile, schema, - (ThriftRecordReaderConfig) segmentGeneratorConfig.getReaderConfig()); + return getRecordReader(fileFormat, dataFile, schema, segmentGeneratorConfig.getReaderConfig()); // NOTE: PinotSegmentRecordReader does not support time conversion (field spec must match) case PINOT: return new PinotSegmentRecordReader(dataFile, schema, segmentGeneratorConfig.getColumnSortOrder()); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java index 7f73865..a7404c0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java @@ -21,13 +21,11 @@ package org.apache.pinot.core.data.readers; import com.google.common.base.Preconditions; import java.io.File; import java.util.concurrent.TimeUnit; -import org.apache.pinot.avro.data.readers.AvroRecordReader; -import org.apache.pinot.csv.data.readers.CSVRecordReader; -import org.apache.pinot.json.data.readers.JSONRecordReader; +import org.apache.pinot.core.data.recordtransformer.CompositeTransformer; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.core.data.recordtransformer.CompositeTransformer; +import org.apache.pinot.spi.data.readers.RecordReader; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -73,9 +71,12 @@ public class RecordReaderSampleDataTest { public void testRecordReaders() throws Exception { CompositeTransformer defaultTransformer = CompositeTransformer.getDefaultTransformer(SCHEMA); - try (AvroRecordReader avroRecordReader = new AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA); - CSVRecordReader csvRecordReader = new CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA, null); - JSONRecordReader jsonRecordReader = new JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA)) { + try (RecordReader avroRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA, null); + RecordReader csvRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA, null); + RecordReader jsonRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA, null)) { int numRecords = 0; while (avroRecordReader.hasNext()) { assertTrue(csvRecordReader.hasNext()); @@ -115,10 +116,12 @@ public class RecordReaderSampleDataTest { @Test public void testSameIncomingOutgoing() throws Exception { - try (AvroRecordReader avroRecordReader = new AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA_SAME_INCOMING_OUTGOING); - CSVRecordReader csvRecordReader = new CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_SAME_INCOMING_OUTGOING, - null); JSONRecordReader jsonRecordReader = new JSONRecordReader(JSON_SAMPLE_DATA_FILE, - SCHEMA_SAME_INCOMING_OUTGOING)) { + try (RecordReader avroRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA_SAME_INCOMING_OUTGOING, null); + RecordReader csvRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA_SAME_INCOMING_OUTGOING, null); + RecordReader jsonRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA_SAME_INCOMING_OUTGOING, null)) { int numRecords = 0; while (avroRecordReader.hasNext()) { assertTrue(csvRecordReader.hasNext()); @@ -144,11 +147,12 @@ public class RecordReaderSampleDataTest { @Test public void testDifferentIncomingOutgoing() throws Exception { - try (AvroRecordReader avroRecordReader = new AvroRecordReader(AVRO_SAMPLE_DATA_FILE, - SCHEMA_DIFFERENT_INCOMING_OUTGOING); - CSVRecordReader csvRecordReader = new CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_DIFFERENT_INCOMING_OUTGOING, - null); JSONRecordReader jsonRecordReader = new JSONRecordReader(JSON_SAMPLE_DATA_FILE, - SCHEMA_DIFFERENT_INCOMING_OUTGOING)) { + try (RecordReader avroRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA_DIFFERENT_INCOMING_OUTGOING, null); + RecordReader csvRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA_DIFFERENT_INCOMING_OUTGOING, null); + RecordReader jsonRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA_DIFFERENT_INCOMING_OUTGOING, null)) { int numRecords = 0; while (avroRecordReader.hasNext()) { assertTrue(csvRecordReader.hasNext()); @@ -177,9 +181,12 @@ public class RecordReaderSampleDataTest { @Test public void testNoIncoming() throws Exception { - try (AvroRecordReader avroRecordReader = new AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING); - CSVRecordReader csvRecordReader = new CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING, null); - JSONRecordReader jsonRecordReader = new JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING)) { + try (RecordReader avroRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING, null); + RecordReader csvRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING, null); + RecordReader jsonRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA_NO_INCOMING, null)) { int numRecords = 0; while (avroRecordReader.hasNext()) { assertTrue(csvRecordReader.hasNext()); @@ -208,9 +215,12 @@ public class RecordReaderSampleDataTest { @Test public void testNoOutgoing() throws Exception { - try (AvroRecordReader avroRecordReader = new AvroRecordReader(AVRO_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING); - CSVRecordReader csvRecordReader = new CSVRecordReader(CSV_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING, null); - JSONRecordReader jsonRecordReader = new JSONRecordReader(JSON_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING)) { + try (RecordReader avroRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING, null); + RecordReader csvRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING, null); + RecordReader jsonRecordReader = RecordReaderFactory + .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA_NO_OUTGOING, null)) { int numRecords = 0; while (avroRecordReader.hasNext()) { assertTrue(csvRecordReader.hasNext()); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java index dccfc17..3ac25f9 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java @@ -18,24 +18,23 @@ */ package org.apache.pinot.core.indexsegment.mutable; -import org.apache.pinot.json.data.readers.JSONRecordReader; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordReader; +import java.io.File; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.pinot.core.data.readers.FileFormat; +import org.apache.pinot.core.data.readers.RecordReaderFactory; import org.apache.pinot.core.data.recordtransformer.CompositeTransformer; import org.apache.pinot.core.segment.index.data.source.ColumnDataSource; import org.apache.pinot.core.segment.index.readers.NullValueVectorReader; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - public class MutableSegmentImplNullValueVectorTest { private static final String PINOT_SCHEMA_FILE_PATH = "data/test_null_value_vector_pinot_schema.json"; @@ -47,17 +46,17 @@ public class MutableSegmentImplNullValueVectorTest { @BeforeClass public void setup() - throws IOException { + throws Exception { URL schemaResourceUrl = this.getClass().getClassLoader().getResource(PINOT_SCHEMA_FILE_PATH); URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE); _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _recordTransformer = CompositeTransformer.getDefaultTransformer(_schema); - File avroFile = new File(dataResourceUrl.getFile()); + File jsonFile = new File(dataResourceUrl.getFile()); _mutableSegmentImpl = MutableSegmentImplTestUtils .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), false, true); GenericRow reuse = new GenericRow(); - try (RecordReader recordReader = new JSONRecordReader(avroFile, _schema)) { + try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile, _schema, null)) { while (recordReader.hasNext()) { recordReader.next(reuse); GenericRow transformedRow = _recordTransformer.transform(reuse); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java index f1b6b38..a8d8ac0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java @@ -22,18 +22,14 @@ import java.io.File; import java.net.URL; import java.util.Collections; import org.apache.commons.io.FileUtils; -import org.apache.pinot.avro.data.readers.AvroRecordReader; -import org.apache.pinot.avro.data.readers.AvroUtils; -import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.common.segment.ReadMode; import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.core.common.BlockMultiValIterator; import org.apache.pinot.core.common.BlockSingleValIterator; import org.apache.pinot.core.common.DataSource; import org.apache.pinot.core.common.DataSourceMetadata; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.core.data.readers.FileFormat; +import org.apache.pinot.core.data.readers.RecordReaderFactory; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; @@ -42,6 +38,10 @@ import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.segments.v1.creator.SegmentTestUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -77,13 +77,13 @@ public class MutableSegmentImplTest { _schema = config.getSchema(); _mutableSegmentImpl = MutableSegmentImplTestUtils - .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(),false); + .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), + false); _lastIngestionTimeMs = System.currentTimeMillis(); StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs); _startTimeMs = System.currentTimeMillis(); - try (RecordReader recordReader = new AvroRecordReader(avroFile, _schema)) { + try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.AVRO, avroFile, _schema, null)) { GenericRow reuse = new GenericRow(); while (recordReader.hasNext()) { _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata); diff --git a/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java b/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java index 5759f90..977014a 100644 --- a/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java +++ b/pinot-record-readers/pinot-avro/src/main/java/org/apache/pinot/avro/data/readers/AvroRecordReader.java @@ -36,14 +36,18 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils; * Record reader for AVRO file. */ public class AvroRecordReader implements RecordReader { - private final File _dataFile; - private final Schema _schema; - private final List<FieldSpec> _fieldSpecs; + private File _dataFile; + private Schema _schema; + private List<FieldSpec> _fieldSpecs; private DataFileStream<GenericRecord> _avroReader; private GenericRecord _reusableAvroRecord = null; - public AvroRecordReader(File dataFile, Schema schema) + public AvroRecordReader() { + } + + @Override + public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { _dataFile = dataFile; _schema = schema; @@ -58,10 +62,6 @@ public class AvroRecordReader implements RecordReader { } @Override - public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig recordReaderConfig) { - } - - @Override public boolean hasNext() { return _avroReader.hasNext(); } diff --git a/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java b/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java index 9d4e534..ea889fe 100644 --- a/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java +++ b/pinot-record-readers/pinot-avro/src/test/java/org/apache/pinot/avro/data/readers/AvroRecordReaderTest.java @@ -38,7 +38,9 @@ public class AvroRecordReaderTest extends AbstractRecordReaderTest { @Override protected RecordReader createRecordReader() throws Exception { - return new AvroRecordReader(_dataFile, getPinotSchema()); + AvroRecordReader avroRecordReader = new AvroRecordReader(); + avroRecordReader.init(_dataFile, getPinotSchema(), null); + return avroRecordReader; } @Override diff --git a/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java b/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java index 2a066a9..df4d2b4 100644 --- a/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java +++ b/pinot-record-readers/pinot-csv/src/main/java/org.apache.pinot.csv.data.readers/CSVRecordReader.java @@ -39,21 +39,25 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils; * Record reader for CSV file. */ public class CSVRecordReader implements RecordReader { - private final File _dataFile; - private final Schema _schema; - private final List<FieldSpec> _fieldSpecs; - private final CSVFormat _format; - private final char _multiValueDelimiter; + private File _dataFile; + private Schema _schema; + private List<FieldSpec> _fieldSpecs; + private CSVFormat _format; + private char _multiValueDelimiter; private CSVParser _parser; private Iterator<CSVRecord> _iterator; - public CSVRecordReader(File dataFile, Schema schema, CSVRecordReaderConfig config) + public CSVRecordReader() { + } + + @Override + public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { _dataFile = dataFile; _schema = schema; _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema); - + CSVRecordReaderConfig config = (CSVRecordReaderConfig) recordReaderConfig; if (config == null) { _format = CSVFormat.DEFAULT.withDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).withHeader(); _multiValueDelimiter = CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER; @@ -92,7 +96,6 @@ public class CSVRecordReader implements RecordReader { _format = format; _multiValueDelimiter = config.getMultiValueDelimiter(); } - init(); } @@ -103,10 +106,6 @@ public class CSVRecordReader implements RecordReader { } @Override - public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig recordReaderConfig) { - } - - @Override public boolean hasNext() { return _iterator.hasNext(); } diff --git a/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java b/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java index ad22639..d5524da 100644 --- a/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java +++ b/pinot-record-readers/pinot-csv/src/test/java/org.apache.pinot.csv.data.readers/CSVRecordReaderTest.java @@ -39,7 +39,9 @@ public class CSVRecordReaderTest extends AbstractRecordReaderTest { throws Exception { CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig(); csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); - return new CSVRecordReader(_dataFile, getPinotSchema(), csvRecordReaderConfig); + CSVRecordReader csvRecordReader = new CSVRecordReader(); + csvRecordReader.init(_dataFile, getPinotSchema(), csvRecordReaderConfig); + return csvRecordReader; } @Override diff --git a/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java b/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java index ca6b63f..3d01f1d 100644 --- a/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java +++ b/pinot-record-readers/pinot-json/src/main/java/org.apache.pinot.json.data.readers/JSONRecordReader.java @@ -38,19 +38,13 @@ import org.apache.pinot.spi.utils.JsonUtils; * Record reader for JSON file. */ public class JSONRecordReader implements RecordReader { - private final File _dataFile; - private final Schema _schema; - private final List<FieldSpec> _fieldSpecs; + private File _dataFile; + private Schema _schema; + private List<FieldSpec> _fieldSpecs; private MappingIterator<Map<String, Object>> _iterator; - public JSONRecordReader(File dataFile, Schema schema) - throws IOException { - _dataFile = dataFile; - _schema = schema; - _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema); - - init(); + public JSONRecordReader() { } private void init() @@ -65,7 +59,12 @@ public class JSONRecordReader implements RecordReader { } @Override - public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig recordReaderConfig) { + public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig recordReaderConfig) + throws IOException { + _dataFile = dataFile; + _schema = schema; + _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema); + init(); } @Override diff --git a/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java b/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java index d2eabae..df40c77 100644 --- a/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java +++ b/pinot-record-readers/pinot-json/src/test/java/org.apache.pinot.json.data.readers/JSONRecordReaderTest.java @@ -34,7 +34,9 @@ public class JSONRecordReaderTest extends AbstractRecordReaderTest { @Override protected RecordReader createRecordReader() throws Exception { - return new JSONRecordReader(_dateFile, getPinotSchema()); + JSONRecordReader recordReader = new JSONRecordReader(); + recordReader.init(_dateFile, getPinotSchema(), null); + return recordReader; } @Override diff --git a/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java b/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java index 5a00aa0..fd16e72 100644 --- a/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java +++ b/pinot-record-readers/pinot-thrift/src/main/java/org.apache.pinot.thrift.data.readers/ThriftRecordReader.java @@ -42,31 +42,17 @@ import org.apache.thrift.transport.TIOStreamTransport; * Record reader for Thrift file. */ public class ThriftRecordReader implements RecordReader { - private final File _dataFile; - private final Schema _schema; - private final List<FieldSpec> _fieldSpecs; - private final Class<?> _thriftClass; - private final Map<String, Integer> _fieldIds = new HashMap<>(); + private File _dataFile; + private Schema _schema; + private List<FieldSpec> _fieldSpecs; + private Class<?> _thriftClass; + private Map<String, Integer> _fieldIds = new HashMap<>(); private InputStream _inputStream; private TProtocol _tProtocol; private boolean _hasNext; - public ThriftRecordReader(File dataFile, Schema schema, ThriftRecordReaderConfig recordReaderConfig) - throws IOException, ClassNotFoundException, IllegalAccessException, InstantiationException { - _dataFile = dataFile; - _schema = schema; - _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema); - _thriftClass = Class.forName(recordReaderConfig.getThriftClass()); - TBase tObject = (TBase) _thriftClass.newInstance(); - int index = 1; - TFieldIdEnum tFieldIdEnum; - while ((tFieldIdEnum = tObject.fieldForId(index)) != null) { - _fieldIds.put(tFieldIdEnum.getFieldName(), index); - index++; - } - - init(); + public ThriftRecordReader() { } private void init() @@ -90,7 +76,27 @@ public class ThriftRecordReader implements RecordReader { } @Override - public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig recordReaderConfig) { + public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig config) + throws IOException { + ThriftRecordReaderConfig recordReaderConfig = (ThriftRecordReaderConfig) config; + _dataFile = dataFile; + _schema = schema; + _fieldSpecs = RecordReaderUtils.extractFieldSpecs(schema); + TBase tObject; + try { + _thriftClass = Class.forName(recordReaderConfig.getThriftClass()); + tObject = (TBase) _thriftClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + int index = 1; + TFieldIdEnum tFieldIdEnum; + while ((tFieldIdEnum = tObject.fieldForId(index)) != null) { + _fieldIds.put(tFieldIdEnum.getFieldName(), index); + index++; + } + + init(); } @Override diff --git a/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java b/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java index 54c1446..465d9cf 100644 --- a/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java +++ b/pinot-record-readers/pinot-thrift/src/test/java/org.apache.pinot.thrift.data.readers/ThriftRecordReaderTest.java @@ -93,9 +93,9 @@ public class ThriftRecordReaderTest { @Test public void testReadData() - throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { - ThriftRecordReader recordReader = new ThriftRecordReader(_tempFile, getSchema(), getThriftRecordReaderConfig()); - + throws IOException { + ThriftRecordReader recordReader = new ThriftRecordReader(); + recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig()); List<GenericRow> genericRows = new ArrayList<>(); while (recordReader.hasNext()) { genericRows.add(recordReader.next()); @@ -112,8 +112,9 @@ public class ThriftRecordReaderTest { @Test public void testRewind() - throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException { - ThriftRecordReader recordReader = new ThriftRecordReader(_tempFile, getSchema(), getThriftRecordReaderConfig()); + throws IOException { + ThriftRecordReader recordReader = new ThriftRecordReader(); + recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig()); List<GenericRow> genericRows = new ArrayList<>(); while (recordReader.hasNext()) { genericRows.add(recordReader.next()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
