This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6e0adf6c41 Replace deprecated methods in ParquetNativeRecordReader
(#9106)
6e0adf6c41 is described below
commit 6e0adf6c41a02b448400cbe5e457dc90fecd01f9
Author: Kartik Khare <[email protected]>
AuthorDate: Tue Jul 26 15:25:59 2022 +0530
Replace deprecated methods in ParquetNativeRecordReader (#9106)
Co-authored-by: Kartik Khare <[email protected]>
---
.../parquet/ParquetNativeRecordReader.java | 28 ++++++++++++----------
.../plugin/inputformat/parquet/ParquetUtils.java | 8 +++----
2 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index 5448818d90..3f413b9a68 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -24,12 +24,13 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;
@@ -45,32 +46,37 @@ public class ParquetNativeRecordReader implements
RecordReader {
private Path _dataFilePath;
private ParquetNativeRecordExtractor _recordExtractor;
private MessageType _schema;
- private ParquetMetadata _parquetMetadata;
private ParquetFileReader _parquetFileReader;
private Group _nextRecord;
private PageReadStore _pageReadStore;
private MessageColumnIO _columnIO;
private org.apache.parquet.io.RecordReader _parquetRecordReader;
private int _currentPageIdx;
+ private Configuration _hadoopConf;
+ private ParquetReadOptions _parquetReadOptions;
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
_dataFilePath = new Path(dataFile.getAbsolutePath());
- Configuration conf = new Configuration();
- _parquetMetadata = ParquetFileReader.readFooter(conf, _dataFilePath,
ParquetMetadataConverter.NO_FILTER);
+ _hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
_recordExtractor = new ParquetNativeRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
- _schema = _parquetMetadata.getFileMetaData().getSchema();
- _parquetFileReader =
- new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(),
_dataFilePath, _parquetMetadata.getBlocks(),
- _schema.getColumns());
+
+ _parquetReadOptions = ParquetReadOptions.builder()
+ .withMetadataFilter(ParquetMetadataConverter.NO_FILTER)
+ .build();
+
+ _parquetFileReader =
ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
+ _parquetReadOptions);
+ _schema = _parquetFileReader.getFooter().getFileMetaData().getSchema();
_pageReadStore = _parquetFileReader.readNextRowGroup();
_columnIO = new ColumnIOFactory().getColumnIO(_schema);
_parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new
GroupRecordConverter(_schema));
_currentPageIdx = 0;
}
+
@Override
public boolean hasNext() {
if (_pageReadStore == null) {
@@ -113,10 +119,8 @@ public class ParquetNativeRecordReader implements
RecordReader {
public void rewind()
throws IOException {
_parquetFileReader.close();
- Configuration conf = new Configuration();
- _parquetFileReader =
- new ParquetFileReader(conf, _parquetMetadata.getFileMetaData(),
_dataFilePath, _parquetMetadata.getBlocks(),
- _schema.getColumns());
+ _parquetFileReader =
ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
+ _parquetReadOptions);
_pageReadStore = _parquetFileReader.readNextRowGroup();
_parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new
GroupRecordConverter(_schema));
_currentPageIdx = 0;
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index 5f3dd81909..c3a6bd7a2d 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -49,7 +49,7 @@ public class ParquetUtils {
throws IOException {
//noinspection unchecked
return
AvroParquetReader.<GenericRecord>builder(path).disableCompatibility().withDataModel(GenericData.get())
- .withConf(getParquetAvroReaderConfiguration()).build();
+ .withConf(getParquetHadoopConfiguration()).build();
}
/**
@@ -58,7 +58,7 @@ public class ParquetUtils {
public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path,
Schema schema)
throws IOException {
return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema)
- .withConf(getParquetAvroReaderConfiguration()).build();
+ .withConf(getParquetHadoopConfiguration()).build();
}
/**
@@ -67,7 +67,7 @@ public class ParquetUtils {
public static Schema getParquetAvroSchema(Path path)
throws IOException {
ParquetMetadata footer =
- ParquetFileReader.readFooter(getParquetAvroReaderConfiguration(),
path, ParquetMetadataConverter.NO_FILTER);
+ ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path,
ParquetMetadataConverter.NO_FILTER);
Map<String, String> metaData =
footer.getFileMetaData().getKeyValueMetaData();
String schemaString = metaData.get("parquet.avro.schema");
if (schemaString == null) {
@@ -82,7 +82,7 @@ public class ParquetUtils {
}
}
- private static Configuration getParquetAvroReaderConfiguration() {
+ public static Configuration getParquetHadoopConfiguration() {
// The file path used in ParquetRecordReader is a local file path without
prefix 'file:///',
// so we have to make sure that the configuration item 'fs.defaultFS' is
set to 'file:///'
// in case that user's hadoop conf overwrite this item
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]