This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new afd39dde8 PARQUET-2410: Use row count instead of value count to get 
row count from OffsetIndex (#1234)
afd39dde8 is described below

commit afd39dde8fd762bf696fea3dab16d45eae1093c3
Author: Xianyang Liu <[email protected]>
AuthorDate: Mon Dec 11 10:04:33 2023 +0800

    PARQUET-2410: Use row count instead of value count to get row count from 
OffsetIndex (#1234)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 15 ++++++-
 .../hadoop/rewrite/ParquetRewriterTest.java        |  6 ++-
 .../parquet/hadoop/util/TestFileBuilder.java       | 47 +++++++++++++++-------
 3 files changed, 49 insertions(+), 19 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 5411f0a79..fac19df17 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -326,6 +326,7 @@ public class ParquetRewriter implements Closeable {
           // Translate compression and/or encryption
           writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
           processChunk(
+              blockMetaData.getRowCount(),
               chunk,
               newCodecName,
               columnChunkEncryptorRunTime,
@@ -352,6 +353,7 @@ public class ParquetRewriter implements Closeable {
   }
 
   private void processChunk(
+      long blockRowCount,
       ColumnChunkMetaData chunk,
       CompressionCodecName newCodecName,
       ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
@@ -391,7 +393,8 @@ public class ParquetRewriter implements Closeable {
 
     reader.setStreamPosition(chunk.getStartingPos());
     DictionaryPage dictionaryPage = null;
-    long readValues = 0;
+    long readValues = 0L;
+    long readRows = 0L;
     Statistics<?> statistics = null;
     boolean isColumnStatisticsMalformed = false;
     ParquetMetadataConverter converter = new ParquetMetadataConverter();
@@ -459,8 +462,9 @@ public class ParquetRewriter implements Closeable {
           readValues += headerV1.getNum_values();
           if (offsetIndex != null) {
             long rowCount = 1
-                + offsetIndex.getLastRowIndex(pageOrdinal, totalChunkValues)
+                + offsetIndex.getLastRowIndex(pageOrdinal, blockRowCount)
                 - offsetIndex.getFirstRowIndex(pageOrdinal);
+            readRows += rowCount;
             writer.writeDataPage(
                 toIntWithCheck(headerV1.getNum_values()),
                 pageHeader.getUncompressed_page_size(),
@@ -524,6 +528,7 @@ public class ParquetRewriter implements Closeable {
                 "Detected mixed null page statistics and non-null page 
statistics");
           }
           readValues += headerV2.getNum_values();
+          readRows += headerV2.getNum_rows();
           writer.writeDataPageV2(
               headerV2.getNum_rows(),
               headerV2.getNum_nulls(),
@@ -544,6 +549,12 @@ public class ParquetRewriter implements Closeable {
       }
     }
 
+    Preconditions.checkState(
+        readRows == 0 || readRows == blockRowCount,
+        "Read row count: %s not match with block total row count: %s",
+        readRows,
+        blockRowCount);
+
     if (isColumnStatisticsMalformed) {
       // All the column statistics are invalid, so we need to overwrite the 
column statistics
       writer.invalidateStatistics(chunk.getStatistics());
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index aedf3b8c3..3a6548a68 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -882,6 +882,8 @@ public class ParquetRewriterTest {
         BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(inBlockId);
         BlockMetaData outBlockMetaData = 
outMetaData.getBlocks().get(outBlockId);
 
+        assertEquals(inBlockMetaData.getRowCount(), 
outBlockMetaData.getRowCount());
+
         for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
           if (!outFileColumnMapping.containsKey(j)) {
             continue;
@@ -908,8 +910,8 @@ public class ParquetRewriterTest {
             for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
               assertEquals(inOffsetIndex.getFirstRowIndex(k), 
outOffsetIndex.getFirstRowIndex(k));
               assertEquals(
-                  inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()),
-                  outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount()));
+                  inOffsetIndex.getLastRowIndex(k, 
inBlockMetaData.getRowCount()),
+                  outOffsetIndex.getLastRowIndex(k, 
outBlockMetaData.getRowCount()));
               assertEquals(inOffsetIndex.getOffset(k), (long) 
inOffsets.get(k));
               assertEquals(outOffsetIndex.getOffset(k), (long) 
outOffsets.get(k));
             }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index 25f53cf49..809b4fd35 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -129,9 +129,9 @@ public class TestFileBuilder {
       builder.withBloomFilterEnabled(columnPath, true);
     }
 
-    try (ParquetWriter writer = builder.build()) {
-      for (int i = 0; i < fileContent.length; i++) {
-        writer.write(fileContent[i]);
+    try (ParquetWriter<Group> writer = builder.build()) {
+      for (SimpleGroup simpleGroup : fileContent) {
+        writer.write(simpleGroup);
       }
     }
     return new EncryptionTestFile(fileName, fileContent);
@@ -152,18 +152,7 @@ public class TestFileBuilder {
   private void addValueToSimpleGroup(Group g, Type type) {
     if (type.isPrimitive()) {
       PrimitiveType primitiveType = (PrimitiveType) type;
-      if (primitiveType.getPrimitiveTypeName().equals(INT32)) {
-        g.add(type.getName(), getInt());
-      } else if (primitiveType.getPrimitiveTypeName().equals(INT64)) {
-        g.add(type.getName(), getLong());
-      } else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
-        g.add(type.getName(), getString());
-      } else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
-        g.add(type.getName(), getFloat());
-      } else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
-        g.add(type.getName(), getDouble());
-      }
-      // Only support 5 types now, more can be added later
+      addPrimitiveValueToSimpleGroup(g, primitiveType);
     } else {
       GroupType groupType = (GroupType) type;
       Group parentGroup = g.addGroup(groupType.getName());
@@ -173,6 +162,34 @@ public class TestFileBuilder {
     }
   }
 
+  private void addPrimitiveValueToSimpleGroup(Group g, PrimitiveType 
primitiveType) {
+    if (primitiveType.isRepetition(Type.Repetition.REPEATED)) {
+      int listSize = ThreadLocalRandom.current().nextInt(1, 10);
+      for (int i = 0; i < listSize; i++) {
+        addSinglePrimitiveValueToSimpleGroup(g, primitiveType);
+      }
+    } else {
+      addSinglePrimitiveValueToSimpleGroup(g, primitiveType);
+    }
+  }
+
+  private void addSinglePrimitiveValueToSimpleGroup(Group g, PrimitiveType 
primitiveType) {
+    if (primitiveType.getPrimitiveTypeName().equals(INT32)) {
+      g.add(primitiveType.getName(), getInt());
+    } else if (primitiveType.getPrimitiveTypeName().equals(INT64)) {
+      g.add(primitiveType.getName(), getLong());
+    } else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
+      g.add(primitiveType.getName(), getString());
+    } else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
+      g.add(primitiveType.getName(), getFloat());
+    } else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
+      g.add(primitiveType.getName(), getDouble());
+    } else {
+      throw new UnsupportedOperationException("Unsupported type: " + 
primitiveType);
+    }
+    // Only support 5 types now, more can be added later
+  }
+
   private static long getInt() {
     return ThreadLocalRandom.current().nextInt(10000);
   }

Reply via email to