This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa8f94ba PARQUET-2219: ParquetFileReader skips empty row group (#1018)
2fa8f94ba is described below
commit 2fa8f94baeef538b460be38e3c933249d8a27675
Author: Gang Wu <[email protected]>
AuthorDate: Sun Jan 15 08:50:21 2023 +0800
PARQUET-2219: ParquetFileReader skips empty row group (#1018)
* PARQUET-2219: ParquetFileReader skips empty row group
The parquet specs does not forbid empty row group and some
implementations are able to generate files with empty row group.
The commit aims to make ParquetFileReader robust by skipping
empty row group while reading.
* - test readNextFilteredRowGroup()
- add test file for empty blocks next to each other
* print file path in the warning
---
.../parquet/hadoop/ParquetEmptyBlockException.java | 41 +++++
.../apache/parquet/hadoop/ParquetFileReader.java | 21 ++-
.../hadoop/TestParquetReaderEmptyBlock.java | 170 +++++++++++++++++++++
.../test/resources/test-empty-row-group_1.parquet | Bin 0 -> 191 bytes
.../test/resources/test-empty-row-group_2.parquet | Bin 0 -> 675 bytes
.../test/resources/test-empty-row-group_3.parquet | Bin 0 -> 781 bytes
6 files changed, 226 insertions(+), 6 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetEmptyBlockException.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetEmptyBlockException.java
new file mode 100644
index 000000000..249d4cb09
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetEmptyBlockException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+public class ParquetEmptyBlockException extends ParquetRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public ParquetEmptyBlockException() {
+ }
+
+ public ParquetEmptyBlockException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ParquetEmptyBlockException(String message) {
+ super(message);
+ }
+
+ public ParquetEmptyBlockException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 5a798a524..8f5117502 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -52,8 +52,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import java.util.zip.CRC32;
import org.apache.hadoop.conf.Configuration;
@@ -927,7 +925,15 @@ public class ParquetFileReader implements Closeable {
* @return the PageReadStore which can provide PageReaders for each column.
*/
public PageReadStore readNextRowGroup() throws IOException {
- ColumnChunkPageReadStore rowGroup = internalReadRowGroup(currentBlock);
+ ColumnChunkPageReadStore rowGroup = null;
+ try {
+ rowGroup = internalReadRowGroup(currentBlock);
+ } catch (ParquetEmptyBlockException e) {
+ LOG.warn("Read empty block at index {} from {}", currentBlock,
getFile());
+ advanceToNextBlock();
+ return readNextRowGroup();
+ }
+
if (rowGroup == null) {
return null;
}
@@ -948,7 +954,7 @@ public class ParquetFileReader implements Closeable {
}
BlockMetaData block = blocks.get(blockIndex);
if (block.getRowCount() == 0) {
- throw new RuntimeException("Illegal row group of 0 rows");
+ throw new ParquetEmptyBlockException("Illegal row group of 0 rows");
}
ColumnChunkPageReadStore rowGroup = new
ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
@@ -1001,7 +1007,7 @@ public class ParquetFileReader implements Closeable {
BlockMetaData block = blocks.get(blockIndex);
if (block.getRowCount() == 0) {
- throw new RuntimeException("Illegal row group of 0 rows");
+ throw new ParquetEmptyBlockException("Illegal row group of 0 rows");
}
RowRanges rowRanges = getRowRanges(blockIndex);
@@ -1038,7 +1044,10 @@ public class ParquetFileReader implements Closeable {
}
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0L) {
- throw new RuntimeException("Illegal row group of 0 rows");
+ LOG.warn("Read empty block at index {} from {}", currentBlock,
getFile());
+ // Skip the empty block
+ advanceToNextBlock();
+ return readNextFilteredRowGroup();
}
RowRanges rowRanges = getRowRanges(currentBlock);
long rowCount = rowRanges.rowCount();
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderEmptyBlock.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderEmptyBlock.java
new file mode 100644
index 000000000..d6f8c2f1f
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderEmptyBlock.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.intColumn;
+
+public class TestParquetReaderEmptyBlock {
+
+ // The parquet file contains only one empty row group
+ private static final Path EMPTY_BLOCK_FILE_1 =
createPathFromCP("/test-empty-row-group_1.parquet");
+
+ // The parquet file contains three row groups, the second one is empty
+ private static final Path EMPTY_BLOCK_FILE_2 =
createPathFromCP("/test-empty-row-group_2.parquet");
+
+ // The parquet file contains four row groups, the second one and third one
are empty
+ private static final Path EMPTY_BLOCK_FILE_3 =
createPathFromCP("/test-empty-row-group_3.parquet");
+
+ private static Path createPathFromCP(String path) {
+ try {
+ return new
Path(TestParquetReaderEmptyBlock.class.getResource(path).toURI());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testReadOnlyEmptyBlock() throws IOException {
+ Configuration conf = new Configuration();
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ InputFile inputFile = HadoopInputFile.fromPath(EMPTY_BLOCK_FILE_1, conf);
+
+ // The parquet file contains only one empty row group
+ ParquetMetadata readFooter = ParquetFileReader.readFooter(inputFile,
options, inputFile.newStream());
+ Assert.assertEquals(1, readFooter.getBlocks().size());
+
+ // The empty block is skipped via readNextRowGroup()
+ try (ParquetFileReader r = new ParquetFileReader(inputFile, options)) {
+ Assert.assertNull(r.readNextRowGroup());
+ }
+
+ // The empty block is skipped via readNextFilteredRowGroup()
+ FilterCompat.Filter filter = FilterCompat.get(gt(intColumn("a"), 1));
+ ParquetReadOptions filterOptions = ParquetReadOptions.builder()
+ .copy(options)
+ .withRecordFilter(filter)
+ .useStatsFilter(true)
+ .build();
+ try (ParquetFileReader r = new ParquetFileReader(inputFile,
filterOptions)) {
+ Assert.assertNull(r.readNextFilteredRowGroup());
+ }
+ }
+
+ @Test
+ public void testSkipEmptyBlock() throws IOException {
+ Configuration conf = new Configuration();
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ InputFile inputFile = HadoopInputFile.fromPath(EMPTY_BLOCK_FILE_2, conf);
+
+ // The parquet file contains three row groups, the second one is empty
+ ParquetMetadata readFooter = ParquetFileReader.readFooter(inputFile,
options, inputFile.newStream());
+ Assert.assertEquals(3, readFooter.getBlocks().size());
+
+ // Second row group is empty and skipped via readNextRowGroup()
+ try (ParquetFileReader r = new ParquetFileReader(inputFile, options)) {
+ PageReadStore pages = null;
+ pages = r.readNextRowGroup();
+ Assert.assertNotNull(pages);
+ Assert.assertEquals(1, pages.getRowCount());
+
+ pages = r.readNextRowGroup();
+ Assert.assertNotNull(pages);
+ Assert.assertEquals(3, pages.getRowCount());
+
+ pages = r.readNextRowGroup();
+ Assert.assertNull(pages);
+ }
+
+ // Only the last row group is read via readNextFilteredRowGroup()
+ FilterCompat.Filter filter = FilterCompat.get(gt(intColumn("a"), 1));
+ ParquetReadOptions filterOptions = ParquetReadOptions.builder()
+ .copy(options)
+ .withRecordFilter(filter)
+ .useStatsFilter(true)
+ .build();
+ try (ParquetFileReader r = new ParquetFileReader(inputFile,
filterOptions)) {
+ PageReadStore pages = null;
+ pages = r.readNextFilteredRowGroup();
+ Assert.assertNotNull(pages);
+ Assert.assertEquals(3, pages.getRowCount());
+
+ pages = r.readNextFilteredRowGroup();
+ Assert.assertNull(pages);
+ }
+ }
+
+ @Test
+ public void testSkipEmptyBlocksNextToEachOther() throws IOException {
+ Configuration conf = new Configuration();
+ ParquetReadOptions options = ParquetReadOptions.builder().build();
+ InputFile inputFile = HadoopInputFile.fromPath(EMPTY_BLOCK_FILE_3, conf);
+
+ // The parquet file contains four row groups, the second one and third one
are empty
+ ParquetMetadata readFooter = ParquetFileReader.readFooter(inputFile,
options, inputFile.newStream());
+ Assert.assertEquals(4, readFooter.getBlocks().size());
+
+ // Second and third row groups are empty and skipped via readNextRowGroup()
+ try (ParquetFileReader r = new ParquetFileReader(inputFile, options)) {
+ PageReadStore pages = null;
+ pages = r.readNextRowGroup();
+ Assert.assertNotNull(pages);
+ Assert.assertEquals(1, pages.getRowCount());
+
+ pages = r.readNextRowGroup();
+ Assert.assertNotNull(pages);
+ Assert.assertEquals(4, pages.getRowCount());
+
+ pages = r.readNextRowGroup();
+ Assert.assertNull(pages);
+ }
+
+ // Only the last row group is read via readNextFilteredRowGroup()
+ FilterCompat.Filter filter = FilterCompat.get(gt(intColumn("a"), 1));
+ ParquetReadOptions filterOptions = ParquetReadOptions.builder()
+ .copy(options)
+ .withRecordFilter(filter)
+ .useStatsFilter(true)
+ .build();
+ try (ParquetFileReader r = new ParquetFileReader(inputFile,
filterOptions)) {
+ PageReadStore pages = null;
+ pages = r.readNextFilteredRowGroup();
+ Assert.assertNotNull(pages);
+ Assert.assertEquals(4, pages.getRowCount());
+
+ pages = r.readNextFilteredRowGroup();
+ Assert.assertNull(pages);
+ }
+ }
+
+}
diff --git a/parquet-hadoop/src/test/resources/test-empty-row-group_1.parquet
b/parquet-hadoop/src/test/resources/test-empty-row-group_1.parquet
new file mode 100644
index 000000000..ac8c2dcff
Binary files /dev/null and
b/parquet-hadoop/src/test/resources/test-empty-row-group_1.parquet differ
diff --git a/parquet-hadoop/src/test/resources/test-empty-row-group_2.parquet
b/parquet-hadoop/src/test/resources/test-empty-row-group_2.parquet
new file mode 100644
index 000000000..56fe96fed
Binary files /dev/null and
b/parquet-hadoop/src/test/resources/test-empty-row-group_2.parquet differ
diff --git a/parquet-hadoop/src/test/resources/test-empty-row-group_3.parquet
b/parquet-hadoop/src/test/resources/test-empty-row-group_3.parquet
new file mode 100644
index 000000000..7efd8a81a
Binary files /dev/null and
b/parquet-hadoop/src/test/resources/test-empty-row-group_3.parquet differ