This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 982b128ba3 handle reading empty parquet files (#11532)
982b128ba3 is described below

commit 982b128ba37892589d233dc74dbf11d09c3a7882
Author: Johan Adami <[email protected]>
AuthorDate: Wed Sep 6 20:30:09 2023 -0400

    handle reading empty parquet files (#11532)
    
    Co-authored-by: Johan Adami <[email protected]>
---
 .../parquet/ParquetNativeRecordReader.java         | 24 ++++++++++++++--------
 .../parquet/ParquetNativeRecordReaderFullTest.java |  9 ++++++--
 .../parquet/ParquetRecordReaderTest.java           | 10 +++++++++
 3 files changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index 247bd6c82a..fd3ea1e264 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -53,7 +53,7 @@ public class ParquetNativeRecordReader implements 
RecordReader {
   private Group _nextRecord;
   private PageReadStore _pageReadStore;
   private MessageColumnIO _columnIO;
-  private org.apache.parquet.io.RecordReader _parquetRecordReader;
+  private org.apache.parquet.io.RecordReader<Group> _parquetRecordReader;
   private int _currentPageIdx;
   private Configuration _hadoopConf;
   private ParquetReadOptions _parquetReadOptions;
@@ -72,10 +72,20 @@ public class ParquetNativeRecordReader implements 
RecordReader {
     _parquetFileReader =
         ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, 
_hadoopConf), _parquetReadOptions);
     _schema = _parquetFileReader.getFooter().getFileMetaData().getSchema();
-    _pageReadStore = _parquetFileReader.readNextRowGroup();
     _columnIO = new ColumnIOFactory().getColumnIO(_schema);
-    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new 
GroupRecordConverter(_schema));
-    _currentPageIdx = 0;
+    init();
+  }
+
+  private void init()
+      throws IOException {
+    _pageReadStore = _parquetFileReader.readNextRowGroup();
+    // If the parquet file is initially empty, then we cannot set up the 
_pageRecordReader.
+    // It's expected a user would always call init() -> hasNext() -> next().
+    // Without this, an empty parquet file will fail to init.
+    if (_pageReadStore != null) {
+      _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new 
GroupRecordConverter(_schema));
+      _currentPageIdx = 0;
+    }
   }
 
   @Override
@@ -110,7 +120,7 @@ public class ParquetNativeRecordReader implements 
RecordReader {
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
-    _nextRecord = (Group) _parquetRecordReader.read();
+    _nextRecord = _parquetRecordReader.read();
     _recordExtractor.extract(_nextRecord, reuse);
     _currentPageIdx++;
     return reuse;
@@ -122,9 +132,7 @@ public class ParquetNativeRecordReader implements 
RecordReader {
     _parquetFileReader.close();
     _parquetFileReader =
         ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, 
_hadoopConf), _parquetReadOptions);
-    _pageReadStore = _parquetFileReader.readNextRowGroup();
-    _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new 
GroupRecordConverter(_schema));
-    _currentPageIdx = 0;
+    init();
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
index 7e2a101a0e..93edf424dd 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderFullTest.java
@@ -39,7 +39,7 @@ public class ParquetNativeRecordReaderFullTest {
     testParquetFile("test-data/test-null.parquet");
     testParquetFile("test-data/test-null-dictionary.parquet");
     
testParquetFile("test-data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.1.parq");
-    //testParquetFile("test-data/dir_metadata/empty.parquet");
+    testParquetFile("test-data/dir_metadata/empty.parquet");
     
testParquetFile("test-data/multi_rgs_pyarrow/b=hi/a97cc141d16f4014a59e5b234dddf07c.parquet");
     
testParquetFile("test-data/multi_rgs_pyarrow/b=lo/01bc139247874a0aa9e0e541f2eec497.parquet");
     for (int i = 1; i < 4; i++) {
@@ -54,7 +54,7 @@ public class ParquetNativeRecordReaderFullTest {
     testParquetFile("test-data/customer.impala.parquet");
     testParquetFile("test-data/datapage_v2.snappy.parquet");
     testParquetFile("test-data/decimals.parquet");
-    //testParquetFile("test-data/empty.parquet");
+    testParquetFile("test-data/empty.parquet");
     testParquetFile("test-data/foo.parquet");
     testParquetFile("test-data/gzip-nation.impala.parquet");
     testParquetFile("test-data/map-test.snappy.parquet");
@@ -129,6 +129,11 @@ public class ParquetNativeRecordReaderFullTest {
     while (recordReader.hasNext()) {
       recordReader.next();
     }
+    recordReader.rewind();
+    while (recordReader.hasNext()) {
+      recordReader.next();
+    }
+
     recordReader.close();
   }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 78e96203ce..9c861237e1 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -148,6 +148,15 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
     Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
     Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
 
+    testComparison(avroRecordReader, nativeRecordReader, totalRecords);
+    avroRecordReader.rewind();
+    nativeRecordReader.rewind();
+    testComparison(avroRecordReader, nativeRecordReader, totalRecords);
+  }
+
+  private void testComparison(ParquetRecordReader avroRecordReader, 
ParquetRecordReader nativeRecordReader,
+      int totalRecords)
+      throws IOException {
     GenericRow avroReuse = new GenericRow();
     GenericRow nativeReuse = new GenericRow();
     int recordsRead = 0;
@@ -159,6 +168,7 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
       Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
       recordsRead++;
     }
+    Assert.assertFalse(nativeRecordReader.hasNext());
     Assert.assertEquals(recordsRead, totalRecords,
         "Message read from ParquetRecordReader doesn't match the expected 
number.");
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to