This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new ff36d6bca PARQUET-2365 : Fixes NPE when rewriting column without 
column index (#1173)
ff36d6bca is described below

commit ff36d6bca13e0c626dd3c7932d94fad2ee2c9214
Author: Xianyang Liu <[email protected]>
AuthorDate: Sat Nov 4 18:41:24 2023 +0800

    PARQUET-2365 : Fixes NPE when rewriting column without column index (#1173)
---
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  | 38 +++++++------
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 49 +++++++++++-----
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 22 ++++++++
 .../hadoop/rewrite/ParquetRewriterTest.java        | 66 +++++++++++-----------
 .../parquet/hadoop/util/TestFileBuilder.java       | 26 ++++++++-
 5 files changed, 137 insertions(+), 64 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index cd6ec5d93..54ce829c0 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -211,14 +211,7 @@ public class ColumnChunkPageWriteStore implements 
PageWriteStore, BloomFilterWri
       this.totalValueCount += valueCount;
       this.pageCount += 1;
 
-      // Copying the statistics if it is not initialized yet so we have the 
correct typed one
-      if (totalStatistics == null) {
-        totalStatistics = statistics.copy();
-      } else {
-        totalStatistics.mergeStatistics(statistics);
-      }
-
-      columnIndexBuilder.add(statistics);
+      mergeColumnStatistics(statistics);
       offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + 
compressedSize), rowCount);
 
       // by concatenating before collecting instead of collecting twice,
@@ -298,14 +291,7 @@ public class ColumnChunkPageWriteStore implements 
PageWriteStore, BloomFilterWri
       this.totalValueCount += valueCount;
       this.pageCount += 1;
 
-      // Copying the statistics if it is not initialized yet so we have the 
correct typed one
-      if (totalStatistics == null) {
-        totalStatistics = statistics.copy();
-      } else {
-        totalStatistics.mergeStatistics(statistics);
-      }
-
-      columnIndexBuilder.add(statistics);
+      mergeColumnStatistics(statistics);
       offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + 
compressedSize), rowCount);
 
       // by concatenating before collecting instead of collecting twice,
@@ -329,6 +315,26 @@ public class ColumnChunkPageWriteStore implements 
PageWriteStore, BloomFilterWri
       return (int)size;
     }
 
+    private void mergeColumnStatistics(Statistics<?> statistics) {
+      if (totalStatistics != null && totalStatistics.isEmpty()) {
+        return;
+      }
+
+      if (statistics == null || statistics.isEmpty()) {
+        // The column index and statistics should be invalid if some page 
statistics are null or empty.
+        // See PARQUET-2365 for more details
+        totalStatistics = 
Statistics.getBuilderForReading(path.getPrimitiveType()).build();
+        columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+      } else if (totalStatistics == null) {
+        // Copying the statistics if it is not initialized yet, so we have the 
correct typed one
+        totalStatistics = statistics.copy();
+        columnIndexBuilder.add(statistics);
+      } else {
+        totalStatistics.mergeStatistics(statistics);
+        columnIndexBuilder.add(statistics);
+      }
+    }
+
     @Override
     public long getMemSize() {
       return buf.size();
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 146ffd496..42ea6d921 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -722,14 +722,7 @@ public class ParquetFileWriter {
     LOG.debug("{}: write data page content {}", out.getPos(), 
compressedPageSize);
     bytes.writeAllTo(out);
 
-    // Copying the statistics if it is not initialized yet so we have the 
correct typed one
-    if (currentStatistics == null) {
-      currentStatistics = statistics.copy();
-    } else {
-      currentStatistics.mergeStatistics(statistics);
-    }
-
-    columnIndexBuilder.add(statistics);
+    mergeColumnStatistics(statistics);
 
     encodingStatsBuilder.addDataEncoding(valuesEncoding);
     currentEncodings.add(rlEncoding);
@@ -867,13 +860,8 @@ public class ParquetFileWriter {
     this.uncompressedLength += uncompressedSize + headersSize;
     this.compressedLength += compressedSize + headersSize;
 
-    if (currentStatistics == null) {
-      currentStatistics = statistics.copy();
-    } else {
-      currentStatistics.mergeStatistics(statistics);
-    }
+    mergeColumnStatistics(statistics);
 
-    columnIndexBuilder.add(statistics);
     currentEncodings.add(dataEncoding);
     encodingStatsBuilder.addDataEncoding(dataEncoding);
 
@@ -988,6 +976,19 @@ public class ParquetFileWriter {
     endColumn();
   }
 
+  /**
+   * Overwrite the column total statistics. This special used when the column 
total statistics
+   * is known while all the page statistics are invalid, for example when 
rewriting the column.
+   *
+   * @param totalStatistics the column total statistics
+   */
+  public void invalidateStatistics(Statistics<?> totalStatistics) {
+    Preconditions.checkArgument(totalStatistics != null, "Column total 
statistics can not be null");
+    currentStatistics = totalStatistics;
+    // Invalid the ColumnIndex
+    columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+  }
+
   /**
    * end a column (once all rep, def and data have been written)
    * @throws IOException if there is an error while writing
@@ -1317,6 +1318,26 @@ public class ParquetFileWriter {
     return (int)size;
   }
 
+  private void mergeColumnStatistics(Statistics<?> statistics) {
+    if (currentStatistics != null && currentStatistics.isEmpty()) {
+      return;
+    }
+
+    if (statistics == null || statistics.isEmpty()) {
+      // The column index and statistics should be invalid if some page 
statistics are null or empty.
+      // See PARQUET-2365 for more details
+      currentStatistics = 
Statistics.getBuilderForReading(currentChunkType).build();
+      columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+    } else if (currentStatistics == null) {
+      // Copying the statistics if it is not initialized yet, so we have the 
correct typed one
+      currentStatistics = statistics.copy();
+      columnIndexBuilder.add(statistics);
+    } else {
+      currentStatistics.mergeStatistics(statistics);
+      columnIndexBuilder.add(statistics);
+    }
+  }
+
   private static void serializeOffsetIndexes(
       List<List<OffsetIndex>> offsetIndexes,
       List<BlockMetaData> blocks,
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 2eaaab18c..bf2155e28 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -392,6 +392,7 @@ public class ParquetRewriter implements Closeable {
     DictionaryPage dictionaryPage = null;
     long readValues = 0;
     Statistics<?> statistics = null;
+    boolean isColumnStatisticsMalformed = false;
     ParquetMetadataConverter converter = new ParquetMetadataConverter();
     int pageOrdinal = 0;
     long totalChunkValues = chunk.getValueCount();
@@ -439,6 +440,14 @@ public class ParquetRewriter implements Closeable {
                   dataPageAAD);
           statistics = convertStatistics(
                   originalCreatedBy, chunk.getPrimitiveType(), 
headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
+          if (statistics == null) {
+            // Reach here means both the columnIndex and the page header 
statistics are null
+            isColumnStatisticsMalformed = true;
+          } else {
+            Preconditions.checkState(
+              !isColumnStatisticsMalformed,
+              "Detected mixed null page statistics and non-null page 
statistics");
+          }
           readValues += headerV1.getNum_values();
           if (offsetIndex != null) {
             long rowCount = 1 + offsetIndex.getLastRowIndex(
@@ -490,6 +499,14 @@ public class ParquetRewriter implements Closeable {
                   dataPageAAD);
           statistics = convertStatistics(
                   originalCreatedBy, chunk.getPrimitiveType(), 
headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
+          if (statistics == null) {
+            // Reach here means both the columnIndex and the page header 
statistics are null
+            isColumnStatisticsMalformed = true;
+          } else {
+            Preconditions.checkState(
+              !isColumnStatisticsMalformed,
+              "Detected mixed null page statistics and non-null page 
statistics");
+          }
           readValues += headerV2.getNum_values();
           writer.writeDataPageV2(headerV2.getNum_rows(),
                   headerV2.getNum_nulls(),
@@ -509,6 +526,11 @@ public class ParquetRewriter implements Closeable {
           break;
       }
     }
+
+    if (isColumnStatisticsMalformed) {
+      // All the column statistics are invalid, so we need to overwrite the 
column statistics
+      writer.invalidateStatistics(chunk.getStatistics());
+    }
   }
 
   private Statistics<?> convertStatistics(String createdBy,
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 6ce7e2c91..ab25566a9 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -80,6 +80,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
 import static org.apache.parquet.schema.Type.Repetition.REPEATED;
@@ -127,17 +129,7 @@ public class ParquetRewriterTest {
     rewriter.close();
 
     // Verify the schema are not changed for the columns not pruned
-    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
-    MessageType schema = pmd.getFileMetaData().getSchema();
-    List<Type> fields = schema.getFields();
-    assertEquals(fields.size(), 3);
-    assertEquals(fields.get(0).getName(), "DocId");
-    assertEquals(fields.get(1).getName(), "Name");
-    assertEquals(fields.get(2).getName(), "Links");
-    List<Type> subFields = fields.get(2).asGroupType().getFields();
-    assertEquals(subFields.size(), 2);
-    assertEquals(subFields.get(0).getName(), "Backward");
-    assertEquals(subFields.get(1).getName(), "Forward");
+    validateSchema();
 
     // Verify codec has been translated
     verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
@@ -199,17 +191,7 @@ public class ParquetRewriterTest {
     rewriter.close();
 
     // Verify the schema are not changed for the columns not pruned
-    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
-    MessageType schema = pmd.getFileMetaData().getSchema();
-    List<Type> fields = schema.getFields();
-    assertEquals(fields.size(), 3);
-    assertEquals(fields.get(0).getName(), "DocId");
-    assertEquals(fields.get(1).getName(), "Name");
-    assertEquals(fields.get(2).getName(), "Links");
-    List<Type> subFields = fields.get(2).asGroupType().getFields();
-    assertEquals(subFields.size(), 2);
-    assertEquals(subFields.get(0).getName(), "Backward");
-    assertEquals(subFields.get(1).getName(), "Forward");
+    validateSchema();
 
     // Verify codec has been translated
     verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
@@ -276,17 +258,7 @@ public class ParquetRewriterTest {
     rewriter.close();
 
     // Verify the schema are not changed for the columns not pruned
-    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
-    MessageType schema = pmd.getFileMetaData().getSchema();
-    List<Type> fields = schema.getFields();
-    assertEquals(fields.size(), 3);
-    assertEquals(fields.get(0).getName(), "DocId");
-    assertEquals(fields.get(1).getName(), "Name");
-    assertEquals(fields.get(2).getName(), "Links");
-    List<Type> subFields = fields.get(2).asGroupType().getFields();
-    assertEquals(subFields.size(), 2);
-    assertEquals(subFields.get(0).getName(), "Backward");
-    assertEquals(subFields.get(1).getName(), "Forward");
+    validateSchema();
 
     // Verify codec has been translated
     FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
@@ -672,6 +644,8 @@ public class ParquetRewriterTest {
       new PrimitiveType(OPTIONAL, INT64, "DocId"),
       new PrimitiveType(REQUIRED, BINARY, "Name"),
       new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+      new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+      new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
       new GroupType(OPTIONAL, "Links",
         new PrimitiveType(REPEATED, BINARY, "Backward"),
         new PrimitiveType(REPEATED, BINARY, "Forward")));
@@ -713,6 +687,16 @@ public class ParquetRewriterTest {
                 expectGroup.getBinary("Gender", 0).getBytes());
       }
 
+      if (!prunePaths.contains("FloatFraction") && 
!nullifiedPaths.contains("FloatFraction")) {
+        assertEquals(group.getFloat("FloatFraction", 0),
+                expectGroup.getFloat("FloatFraction", 0), 0);
+      }
+
+      if (!prunePaths.contains("DoubleFraction") && 
!nullifiedPaths.contains("DoubleFraction")) {
+        assertEquals(group.getDouble("DoubleFraction", 0),
+                expectGroup.getDouble("DoubleFraction", 0), 0);
+      }
+
       Group subGroup = group.getGroup("Links", 0);
 
       if (!prunePaths.contains("Links.Backward") && 
!nullifiedPaths.contains("Links.Backward")) {
@@ -949,4 +933,20 @@ public class ParquetRewriterTest {
 
     return allBloomFilters;
   }
+
+  private void validateSchema() throws IOException {
+    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+    MessageType schema = pmd.getFileMetaData().getSchema();
+    List<Type> fields = schema.getFields();
+    assertEquals(fields.size(), 5);
+    assertEquals(fields.get(0).getName(), "DocId");
+    assertEquals(fields.get(1).getName(), "Name");
+    assertEquals(fields.get(2).getName(), "FloatFraction");
+    assertEquals(fields.get(3).getName(), "DoubleFraction");
+    assertEquals(fields.get(4).getName(), "Links");
+    List<Type> subFields = fields.get(4).asGroupType().getFields();
+    assertEquals(subFields.size(), 2);
+    assertEquals(subFields.get(0).getName(), "Backward");
+    assertEquals(subFields.get(1).getName(), "Forward");
+  }
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index 951899bb3..9eef65d7f 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -36,6 +36,8 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
 
@@ -175,7 +177,13 @@ public class TestFileBuilder
             else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
                 g.add(type.getName(), getString());
             }
-            // Only support 3 types now, more can be added later
+            else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
+                g.add(type.getName(), getFloat());
+            }
+            else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
+                g.add(type.getName(), getDouble());
+            }
+            // Only support 5 types now, more can be added later
         }
         else {
             GroupType groupType = (GroupType) type;
@@ -206,6 +214,22 @@ public class TestFileBuilder
         return sb.toString();
     }
 
+    private static float getFloat()
+    {
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            return Float.NaN;
+        }
+        return ThreadLocalRandom.current().nextFloat();
+    }
+
+    private static double getDouble()
+    {
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            return Double.NaN;
+        }
+        return ThreadLocalRandom.current().nextDouble();
+    }
+
     public static String createTempFile(String prefix)
     {
         try {

Reply via email to