This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new ff36d6bca PARQUET-2365 : Fixes NPE when rewriting column without
column index (#1173)
ff36d6bca is described below
commit ff36d6bca13e0c626dd3c7932d94fad2ee2c9214
Author: Xianyang Liu <[email protected]>
AuthorDate: Sat Nov 4 18:41:24 2023 +0800
PARQUET-2365 : Fixes NPE when rewriting column without column index (#1173)
---
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 38 +++++++------
.../apache/parquet/hadoop/ParquetFileWriter.java | 49 +++++++++++-----
.../parquet/hadoop/rewrite/ParquetRewriter.java | 22 ++++++++
.../hadoop/rewrite/ParquetRewriterTest.java | 66 +++++++++++-----------
.../parquet/hadoop/util/TestFileBuilder.java | 26 ++++++++-
5 files changed, 137 insertions(+), 64 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index cd6ec5d93..54ce829c0 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -211,14 +211,7 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
this.totalValueCount += valueCount;
this.pageCount += 1;
- // Copying the statistics if it is not initialized yet so we have the
correct typed one
- if (totalStatistics == null) {
- totalStatistics = statistics.copy();
- } else {
- totalStatistics.mergeStatistics(statistics);
- }
-
- columnIndexBuilder.add(statistics);
+ mergeColumnStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() +
compressedSize), rowCount);
// by concatenating before collecting instead of collecting twice,
@@ -298,14 +291,7 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
this.totalValueCount += valueCount;
this.pageCount += 1;
- // Copying the statistics if it is not initialized yet so we have the
correct typed one
- if (totalStatistics == null) {
- totalStatistics = statistics.copy();
- } else {
- totalStatistics.mergeStatistics(statistics);
- }
-
- columnIndexBuilder.add(statistics);
+ mergeColumnStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() +
compressedSize), rowCount);
// by concatenating before collecting instead of collecting twice,
@@ -329,6 +315,26 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
return (int)size;
}
+ private void mergeColumnStatistics(Statistics<?> statistics) {
+ if (totalStatistics != null && totalStatistics.isEmpty()) {
+ return;
+ }
+
+ if (statistics == null || statistics.isEmpty()) {
+ // The column index and statistics should be invalid if some page
statistics are null or empty.
+ // See PARQUET-2365 for more details
+ totalStatistics =
Statistics.getBuilderForReading(path.getPrimitiveType()).build();
+ columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+ } else if (totalStatistics == null) {
+ // Copying the statistics if it is not initialized yet, so we have the
correct typed one
+ totalStatistics = statistics.copy();
+ columnIndexBuilder.add(statistics);
+ } else {
+ totalStatistics.mergeStatistics(statistics);
+ columnIndexBuilder.add(statistics);
+ }
+ }
+
@Override
public long getMemSize() {
return buf.size();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 146ffd496..42ea6d921 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -722,14 +722,7 @@ public class ParquetFileWriter {
LOG.debug("{}: write data page content {}", out.getPos(),
compressedPageSize);
bytes.writeAllTo(out);
- // Copying the statistics if it is not initialized yet so we have the
correct typed one
- if (currentStatistics == null) {
- currentStatistics = statistics.copy();
- } else {
- currentStatistics.mergeStatistics(statistics);
- }
-
- columnIndexBuilder.add(statistics);
+ mergeColumnStatistics(statistics);
encodingStatsBuilder.addDataEncoding(valuesEncoding);
currentEncodings.add(rlEncoding);
@@ -867,13 +860,8 @@ public class ParquetFileWriter {
this.uncompressedLength += uncompressedSize + headersSize;
this.compressedLength += compressedSize + headersSize;
- if (currentStatistics == null) {
- currentStatistics = statistics.copy();
- } else {
- currentStatistics.mergeStatistics(statistics);
- }
+ mergeColumnStatistics(statistics);
- columnIndexBuilder.add(statistics);
currentEncodings.add(dataEncoding);
encodingStatsBuilder.addDataEncoding(dataEncoding);
@@ -988,6 +976,19 @@ public class ParquetFileWriter {
endColumn();
}
+ /**
+ * Overwrite the column total statistics. This special used when the column
total statistics
+ * is known while all the page statistics are invalid, for example when
rewriting the column.
+ *
+ * @param totalStatistics the column total statistics
+ */
+ public void invalidateStatistics(Statistics<?> totalStatistics) {
+ Preconditions.checkArgument(totalStatistics != null, "Column total
statistics can not be null");
+ currentStatistics = totalStatistics;
+ // Invalid the ColumnIndex
+ columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+ }
+
/**
* end a column (once all rep, def and data have been written)
* @throws IOException if there is an error while writing
@@ -1317,6 +1318,26 @@ public class ParquetFileWriter {
return (int)size;
}
+ private void mergeColumnStatistics(Statistics<?> statistics) {
+ if (currentStatistics != null && currentStatistics.isEmpty()) {
+ return;
+ }
+
+ if (statistics == null || statistics.isEmpty()) {
+ // The column index and statistics should be invalid if some page
statistics are null or empty.
+ // See PARQUET-2365 for more details
+ currentStatistics =
Statistics.getBuilderForReading(currentChunkType).build();
+ columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+ } else if (currentStatistics == null) {
+ // Copying the statistics if it is not initialized yet, so we have the
correct typed one
+ currentStatistics = statistics.copy();
+ columnIndexBuilder.add(statistics);
+ } else {
+ currentStatistics.mergeStatistics(statistics);
+ columnIndexBuilder.add(statistics);
+ }
+ }
+
private static void serializeOffsetIndexes(
List<List<OffsetIndex>> offsetIndexes,
List<BlockMetaData> blocks,
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 2eaaab18c..bf2155e28 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -392,6 +392,7 @@ public class ParquetRewriter implements Closeable {
DictionaryPage dictionaryPage = null;
long readValues = 0;
Statistics<?> statistics = null;
+ boolean isColumnStatisticsMalformed = false;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
@@ -439,6 +440,14 @@ public class ParquetRewriter implements Closeable {
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(),
headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
+ if (statistics == null) {
+ // Reach here means both the columnIndex and the page header
statistics are null
+ isColumnStatisticsMalformed = true;
+ } else {
+ Preconditions.checkState(
+ !isColumnStatisticsMalformed,
+ "Detected mixed null page statistics and non-null page
statistics");
+ }
readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1 + offsetIndex.getLastRowIndex(
@@ -490,6 +499,14 @@ public class ParquetRewriter implements Closeable {
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(),
headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
+ if (statistics == null) {
+ // Reach here means both the columnIndex and the page header
statistics are null
+ isColumnStatisticsMalformed = true;
+ } else {
+ Preconditions.checkState(
+ !isColumnStatisticsMalformed,
+ "Detected mixed null page statistics and non-null page
statistics");
+ }
readValues += headerV2.getNum_values();
writer.writeDataPageV2(headerV2.getNum_rows(),
headerV2.getNum_nulls(),
@@ -509,6 +526,11 @@ public class ParquetRewriter implements Closeable {
break;
}
}
+
+ if (isColumnStatisticsMalformed) {
+ // All the column statistics are invalid, so we need to overwrite the
column statistics
+ writer.invalidateStatistics(chunk.getStatistics());
+ }
}
private Statistics<?> convertStatistics(String createdBy,
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 6ce7e2c91..ab25566a9 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -80,6 +80,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
@@ -127,17 +129,7 @@ public class ParquetRewriterTest {
rewriter.close();
// Verify the schema are not changed for the columns not pruned
- ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
- MessageType schema = pmd.getFileMetaData().getSchema();
- List<Type> fields = schema.getFields();
- assertEquals(fields.size(), 3);
- assertEquals(fields.get(0).getName(), "DocId");
- assertEquals(fields.get(1).getName(), "Name");
- assertEquals(fields.get(2).getName(), "Links");
- List<Type> subFields = fields.get(2).asGroupType().getFields();
- assertEquals(subFields.size(), 2);
- assertEquals(subFields.get(0).getName(), "Backward");
- assertEquals(subFields.get(1).getName(), "Forward");
+ validateSchema();
// Verify codec has been translated
verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
@@ -199,17 +191,7 @@ public class ParquetRewriterTest {
rewriter.close();
// Verify the schema are not changed for the columns not pruned
- ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
- MessageType schema = pmd.getFileMetaData().getSchema();
- List<Type> fields = schema.getFields();
- assertEquals(fields.size(), 3);
- assertEquals(fields.get(0).getName(), "DocId");
- assertEquals(fields.get(1).getName(), "Name");
- assertEquals(fields.get(2).getName(), "Links");
- List<Type> subFields = fields.get(2).asGroupType().getFields();
- assertEquals(subFields.size(), 2);
- assertEquals(subFields.get(0).getName(), "Backward");
- assertEquals(subFields.get(1).getName(), "Forward");
+ validateSchema();
// Verify codec has been translated
verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
@@ -276,17 +258,7 @@ public class ParquetRewriterTest {
rewriter.close();
// Verify the schema are not changed for the columns not pruned
- ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
- MessageType schema = pmd.getFileMetaData().getSchema();
- List<Type> fields = schema.getFields();
- assertEquals(fields.size(), 3);
- assertEquals(fields.get(0).getName(), "DocId");
- assertEquals(fields.get(1).getName(), "Name");
- assertEquals(fields.get(2).getName(), "Links");
- List<Type> subFields = fields.get(2).asGroupType().getFields();
- assertEquals(subFields.size(), 2);
- assertEquals(subFields.get(0).getName(), "Backward");
- assertEquals(subFields.get(1).getName(), "Forward");
+ validateSchema();
// Verify codec has been translated
FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
@@ -672,6 +644,8 @@ public class ParquetRewriterTest {
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+ new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, BINARY, "Backward"),
new PrimitiveType(REPEATED, BINARY, "Forward")));
@@ -713,6 +687,16 @@ public class ParquetRewriterTest {
expectGroup.getBinary("Gender", 0).getBytes());
}
+ if (!prunePaths.contains("FloatFraction") &&
!nullifiedPaths.contains("FloatFraction")) {
+ assertEquals(group.getFloat("FloatFraction", 0),
+ expectGroup.getFloat("FloatFraction", 0), 0);
+ }
+
+ if (!prunePaths.contains("DoubleFraction") &&
!nullifiedPaths.contains("DoubleFraction")) {
+ assertEquals(group.getDouble("DoubleFraction", 0),
+ expectGroup.getDouble("DoubleFraction", 0), 0);
+ }
+
Group subGroup = group.getGroup("Links", 0);
if (!prunePaths.contains("Links.Backward") &&
!nullifiedPaths.contains("Links.Backward")) {
@@ -949,4 +933,20 @@ public class ParquetRewriterTest {
return allBloomFilters;
}
+
+ private void validateSchema() throws IOException {
+ ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ List<Type> fields = schema.getFields();
+ assertEquals(fields.size(), 5);
+ assertEquals(fields.get(0).getName(), "DocId");
+ assertEquals(fields.get(1).getName(), "Name");
+ assertEquals(fields.get(2).getName(), "FloatFraction");
+ assertEquals(fields.get(3).getName(), "DoubleFraction");
+ assertEquals(fields.get(4).getName(), "Links");
+ List<Type> subFields = fields.get(4).asGroupType().getFields();
+ assertEquals(subFields.size(), 2);
+ assertEquals(subFields.get(0).getName(), "Backward");
+ assertEquals(subFields.get(1).getName(), "Forward");
+ }
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index 951899bb3..9eef65d7f 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -36,6 +36,8 @@ import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
@@ -175,7 +177,13 @@ public class TestFileBuilder
else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
g.add(type.getName(), getString());
}
- // Only support 3 types now, more can be added later
+ else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
+ g.add(type.getName(), getFloat());
+ }
+ else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
+ g.add(type.getName(), getDouble());
+ }
+ // Only support 5 types now, more can be added later
}
else {
GroupType groupType = (GroupType) type;
@@ -206,6 +214,22 @@ public class TestFileBuilder
return sb.toString();
}
+ private static float getFloat()
+ {
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ return Float.NaN;
+ }
+ return ThreadLocalRandom.current().nextFloat();
+ }
+
+ private static double getDouble()
+ {
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ return Double.NaN;
+ }
+ return ThreadLocalRandom.current().nextDouble();
+ }
+
public static String createTempFile(String prefix)
{
try {