This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9912b47 Refactor in preparation for the ORCRecordReader (#3900)
9912b47 is described below
commit 9912b47959c1a9c0915588b4ac60f8ca836a8e27
Author: Jennifer Dai <[email protected]>
AuthorDate: Thu Mar 7 14:31:38 2019 -0800
Refactor in preparation for the ORCRecordReader (#3900)
* Refactoring record reader to allow for customized RecordReader
configuration during segment creation.
---
.../pinot/core/data/readers/AvroRecordReader.java | 6 ++++++
.../pinot/core/data/readers/CSVRecordReader.java | 6 ++++++
.../apache/pinot/core/data/readers/FileFormat.java | 2 +-
.../core/data/readers/GenericRowRecordReader.java | 6 ++++++
.../pinot/core/data/readers/JSONRecordReader.java | 7 +++++++
.../readers/MultiplePinotSegmentRecordReader.java | 6 ++++++
.../core/data/readers/PinotSegmentRecordReader.java | 6 ++++++
.../pinot/core/data/readers/RecordReader.java | 6 ++++++
.../core/data/readers/RecordReaderFactory.java | 21 +++++++++++++++++++++
.../pinot/core/data/readers/ThriftRecordReader.java | 6 ++++++
.../generator/SegmentGeneratorConfig.java | 10 ++++++++++
.../pinot/core/minion/BackfillDateTimeColumn.java | 5 +++++
.../org/apache/pinot/core/minion/SegmentPurger.java | 5 +++++
.../core/minion/segment/MapperRecordReader.java | 6 ++++++
.../core/minion/segment/ReducerRecordReader.java | 6 ++++++
.../converter/RealtimeSegmentRecordReader.java | 6 ++++++
.../apache/pinot/hadoop/job/JobConfigConstants.java | 3 +++
.../hadoop/job/mapper/SegmentCreationMapper.java | 14 +++++++++++---
18 files changed, 123 insertions(+), 4 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
index d95034b..c5ca4cc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
@@ -28,6 +28,7 @@ import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.util.AvroUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +61,11 @@ public class AvroRecordReader implements RecordReader {
}
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
private void validateSchema() {
org.apache.avro.Schema avroSchema = _avroReader.getSchema();
for (FieldSpec fieldSpec : _fieldSpecs) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
index 6d00351..00cce5d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/CSVRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -92,6 +93,11 @@ public class CSVRecordReader implements RecordReader {
init();
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
private void init()
throws IOException {
_parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
index 0826cd5..5f7120d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
@@ -19,5 +19,5 @@
package org.apache.pinot.core.data.readers;
public enum FileFormat {
- AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT
+ AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
index 8fb1bbd..5615f3f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -41,6 +42,11 @@ public class GenericRowRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
return _nextRowId < _numRows;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
index bbe786a..23ec05b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/JSONRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -43,6 +44,7 @@ public class JSONRecordReader implements RecordReader {
private JsonParser _parser;
private Iterator<Map> _iterator;
+
public JSONRecordReader(File dataFile, Schema schema)
throws IOException {
_dataFile = dataFile;
@@ -64,6 +66,11 @@ public class JSONRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
return _iterator.hasNext();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 9489719..6448a63 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -53,6 +54,11 @@ public class MultiplePinotSegmentRecordReader implements
RecordReader {
this(indexDirs, null, null);
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
/**
* Read records using the passed in schema and in the order of sorted column
from multiple pinot segments.
* <p>Passed in schema must be a subset of the segment schema.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index bd3fd6f..50c1808 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.sort.PinotSegmentSorter;
import org.apache.pinot.core.data.readers.sort.SegmentSorter;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -59,6 +60,11 @@ public class PinotSegmentRecordReader implements
RecordReader {
this(indexDir, null, null);
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
/**
* Read records using the segment schema with the given schema and sort order
* <p>Passed in schema must be a subset of the segment schema.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
index bb57fe7..338b91b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReader.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -33,6 +34,11 @@ import org.apache.pinot.core.data.GenericRow;
public interface RecordReader extends Closeable {
/**
+ * Initializes the record reader when needed
+ */
+ void init(SegmentGeneratorConfig segmentGeneratorConfig);
+
+ /**
* Return <code>true</code> if more records remain to be read.
*/
boolean hasNext();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
index 7867005..7909905 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/RecordReaderFactory.java
@@ -22,9 +22,14 @@ import com.google.common.base.Preconditions;
import java.io.File;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RecordReaderFactory {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RecordReaderFactory.class);
+
private RecordReaderFactory() {
}
@@ -35,6 +40,22 @@ public class RecordReaderFactory {
Schema schema = segmentGeneratorConfig.getSchema();
FileFormat fileFormat = segmentGeneratorConfig.getFormat();
+ String recordReaderPath = segmentGeneratorConfig.getRecordReaderPath();
+
+ // Allow for instantiation general record readers from a record reader
path passed into segment generator config
+ // If this is set, this will override the file format
+ if (recordReaderPath != null) {
+ if (fileFormat != FileFormat.OTHER) {
+ // We currently have default file format set to AVRO inside segment
generator config,
+ // do not want to break this behavior for clients.
+ LOGGER.warn("Using recordReaderPath {} to read segment, ignoring
fileformat {}",
+ recordReaderPath, fileFormat.toString());
+ }
+ RecordReader recordReader = (RecordReader)
Class.forName(recordReaderPath).newInstance();
+ recordReader.init(segmentGeneratorConfig);
+ return recordReader;
+ }
+
switch (fileFormat) {
case AVRO:
case GZIPPED_AVRO:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
index b19ca2a..1f24010 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/ThriftRecordReader.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.thrift.TBase;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -77,6 +78,11 @@ public class ThriftRecordReader implements RecordReader {
}
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
private boolean hasMoreToRead()
throws IOException {
_inputStream.mark(1);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 6a5020b..bc176bd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -83,6 +83,7 @@ public class SegmentGeneratorConfig {
private String _dataDir = null;
private String _inputFilePath = null;
private FileFormat _format = FileFormat.AVRO;
+ private String _recordReaderPath = null;
private String _outDir = null;
private boolean _overwrite = false;
private String _tableName = null;
@@ -156,6 +157,7 @@ public class SegmentGeneratorConfig {
_timeColumnType = config._timeColumnType;
_simpleDateFormat = config._simpleDateFormat;
_onHeap = config._onHeap;
+ _recordReaderPath = config._recordReaderPath;
}
/**
@@ -323,6 +325,14 @@ public class SegmentGeneratorConfig {
_format = format;
}
+ public String getRecordReaderPath() {
+ return _recordReaderPath;
+ }
+
+ public void setRecordReaderPath(String recordReaderPath) {
+ _recordReaderPath = recordReaderPath;
+ }
+
public String getOutDir() {
return _outDir;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
index ead3305..9d786b0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
@@ -136,6 +136,11 @@ public class BackfillDateTimeColumn {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
return _baseRecordReader.hasNext();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 90e32d3..9581310 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -151,6 +151,11 @@ public class SegmentPurger {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
if (_recordPurger == null) {
return _recordReader.hasNext();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
index 609a1f2..42abbd3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.MultiplePinotSegmentRecordReader;
import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -54,6 +55,11 @@ public class MapperRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
if (_finished) {
return false;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
index 2fbccb6..80e6507 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.data.readers.RecordReader;
import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
@@ -51,6 +52,11 @@ public class ReducerRecordReader implements RecordReader {
}
@Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
+ @Override
public boolean hasNext() {
if (_finished) {
return false;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index dc35c1d..235ed00 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.realtime.converter;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -49,6 +50,11 @@ public class RealtimeSegmentRecordReader implements
RecordReader {
_sortedDocIdIterationOrder =
realtimeSegment.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn);
}
+ @Override
+ public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+
+ }
+
public int[] getSortedDocIdIterationOrder() {
return _sortedDocIdIterationOrder;
}
diff --git
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index 1a50c5c..35557b5 100644
---
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -49,4 +49,7 @@ public class JobConfigConstants {
public static final String PUSH_TO_PORT = "push.to.port";
public static final String DEFAULT_PERMISSIONS_MASK =
"fs.permissions.umask-mode";
+
+ // The path to the record reader to be configured
+ public static final String RECORD_READER_PATH = "record.reader.path";
}
diff --git
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index 705e1ad..21f3f16 100644
---
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -69,6 +69,7 @@ public class SegmentCreationMapper extends
Mapper<LongWritable, Text, LongWritab
// Optional
protected TableConfig _tableConfig;
+ protected String _recordReaderPath;
protected Path _readerConfigFile;
// HDFS segment tar directory
@@ -101,6 +102,8 @@ public class SegmentCreationMapper extends
Mapper<LongWritable, Text, LongWritab
_readerConfigFile = new Path(readerConfigFile);
}
+ _recordReaderPath = _jobConf.get(JobConfigConstants.RECORD_READER_PATH,
null);
+
// Set up segment name generator
String segmentNameGeneratorType =
_jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE,
JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
@@ -204,9 +207,14 @@ public class SegmentCreationMapper extends
Mapper<LongWritable, Text, LongWritab
segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
segmentGeneratorConfig.setSequenceId(sequenceId);
- FileFormat fileFormat = getFileFormat(inputFileName);
- segmentGeneratorConfig.setFormat(fileFormat);
- segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+ segmentGeneratorConfig.setRecordReaderPath(_recordReaderPath);
+ if (_recordReaderPath != null) {
+ segmentGeneratorConfig.setFormat(FileFormat.OTHER);
+ } else {
+ FileFormat fileFormat = getFileFormat(inputFileName);
+ segmentGeneratorConfig.setFormat(fileFormat);
+ segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
+ }
segmentGeneratorConfig.setOnHeap(true);
addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig,
hdfsInputFile, sequenceId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]