This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 982b128ba3 handle reading empty parquet files (#11532)
982b128ba3 is described below
commit 982b128ba37892589d233dc74dbf11d09c3a7882
Author: Johan Adami <[email protected]>
AuthorDate: Wed Sep 6 20:30:09 2023 -0400
handle reading empty parquet files (#11532)
Co-authored-by: Johan Adami <[email protected]>
---
.../parquet/ParquetNativeRecordReader.java | 24 ++++++++++++++--------
.../parquet/ParquetNativeRecordReaderFullTest.java | 9 ++++++--
.../parquet/ParquetRecordReaderTest.java | 10 +++++++++
3 files changed, 33 insertions(+), 10 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index 247bd6c82a..fd3ea1e264 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -53,7 +53,7 @@ public class ParquetNativeRecordReader implements
RecordReader {
private Group _nextRecord;
private PageReadStore _pageReadStore;
private MessageColumnIO _columnIO;
- private org.apache.parquet.io.RecordReader _parquetRecordReader;
+ private org.apache.parquet.io.RecordReader<Group> _parquetRecordReader;
private int _currentPageIdx;
private Configuration _hadoopConf;
private ParquetReadOptions _parquetReadOptions;
@@ -72,10 +72,20 @@ public class ParquetNativeRecordReader implements
RecordReader {
_parquetFileReader =
ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath,
_hadoopConf), _parquetReadOptions);
_schema = _parquetFileReader.getFooter().getFileMetaData().getSchema();
- _pageReadStore = _parquetFileReader.readNextRowGroup();
_columnIO = new ColumnIOFactory().getColumnIO(_schema);
- _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new
GroupRecordConverter(_schema));
- _currentPageIdx = 0;
+ init();
+ }
+
+ private void init()
+ throws IOException {
+ _pageReadStore = _parquetFileReader.readNextRowGroup();
+ // If the parquet file is initially empty, then we cannot set up the
_pageRecordReader.
+ // It's expected a user would always call init() -> hasNext() -> next().
+ // Without this, an empty parquet file will fail to init.
+ if (_pageReadStore != null) {
+ _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new
GroupRecordConverter(_schema));
+ _currentPageIdx = 0;
+ }
}
@Override
@@ -110,7 +120,7 @@ public class ParquetNativeRecordReader implements
RecordReader {
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
- _nextRecord = (Group) _parquetRecordReader.read();
+ _nextRecord = _parquetRecordReader.read();
_recordExtractor.extract(_nextRecord, reuse);
_currentPageIdx++;
return reuse;
@@ -122,9 +132,7 @@ public class ParquetNativeRecordReader implements
RecordReader {
_parquetFileReader.close();
_parquetFileReader =
ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath,
_hadoopConf), _parquetReadOptions);
- _pageReadStore = _parquetFileReader.readNextRowGroup();
- _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new
GroupRecordConverter(_schema));
- _currentPageIdx = 0;
+ init();
}
@Override
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
index 7e2a101a0e..93edf424dd 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
@@ -39,7 +39,7 @@ public class ParquetNativeRecordReaderFullTest {
testParquetFile("test-data/test-null.parquet");
testParquetFile("test-data/test-null-dictionary.parquet");
testParquetFile("test-data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.1.parq");
- //testParquetFile("test-data/dir_metadata/empty.parquet");
+ testParquetFile("test-data/dir_metadata/empty.parquet");
testParquetFile("test-data/multi_rgs_pyarrow/b=hi/a97cc141d16f4014a59e5b234dddf07c.parquet");
testParquetFile("test-data/multi_rgs_pyarrow/b=lo/01bc139247874a0aa9e0e541f2eec497.parquet");
for (int i = 1; i < 4; i++) {
@@ -54,7 +54,7 @@ public class ParquetNativeRecordReaderFullTest {
testParquetFile("test-data/customer.impala.parquet");
testParquetFile("test-data/datapage_v2.snappy.parquet");
testParquetFile("test-data/decimals.parquet");
- //testParquetFile("test-data/empty.parquet");
+ testParquetFile("test-data/empty.parquet");
testParquetFile("test-data/foo.parquet");
testParquetFile("test-data/gzip-nation.impala.parquet");
testParquetFile("test-data/map-test.snappy.parquet");
@@ -129,6 +129,11 @@ public class ParquetNativeRecordReaderFullTest {
while (recordReader.hasNext()) {
recordReader.next();
}
+ recordReader.rewind();
+ while (recordReader.hasNext()) {
+ recordReader.next();
+ }
+
recordReader.close();
}
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 78e96203ce..9c861237e1 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -148,6 +148,15 @@ public class ParquetRecordReaderTest extends
AbstractRecordReaderTest {
Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
+ testComparison(avroRecordReader, nativeRecordReader, totalRecords);
+ avroRecordReader.rewind();
+ nativeRecordReader.rewind();
+ testComparison(avroRecordReader, nativeRecordReader, totalRecords);
+ }
+
+ private void testComparison(ParquetRecordReader avroRecordReader,
ParquetRecordReader nativeRecordReader,
+ int totalRecords)
+ throws IOException {
GenericRow avroReuse = new GenericRow();
GenericRow nativeReuse = new GenericRow();
int recordsRead = 0;
@@ -159,6 +168,7 @@ public class ParquetRecordReaderTest extends
AbstractRecordReaderTest {
Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
recordsRead++;
}
+ Assert.assertFalse(nativeRecordReader.hasNext());
Assert.assertEquals(recordsRead, totalRecords,
"Message read from ParquetRecordReader doesn't match the expected
number.");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]