This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new afd39dde8 PARQUET-2410: Use row count instead of value count to get
row count from OffsetIndex (#1234)
afd39dde8 is described below
commit afd39dde8fd762bf696fea3dab16d45eae1093c3
Author: Xianyang Liu <[email protected]>
AuthorDate: Mon Dec 11 10:04:33 2023 +0800
PARQUET-2410: Use row count instead of value count to get row count from
OffsetIndex (#1234)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 15 ++++++-
.../hadoop/rewrite/ParquetRewriterTest.java | 6 ++-
.../parquet/hadoop/util/TestFileBuilder.java | 47 +++++++++++++++-------
3 files changed, 49 insertions(+), 19 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 5411f0a79..fac19df17 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -326,6 +326,7 @@ public class ParquetRewriter implements Closeable {
// Translate compression and/or encryption
writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
processChunk(
+ blockMetaData.getRowCount(),
chunk,
newCodecName,
columnChunkEncryptorRunTime,
@@ -352,6 +353,7 @@ public class ParquetRewriter implements Closeable {
}
private void processChunk(
+ long blockRowCount,
ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
@@ -391,7 +393,8 @@ public class ParquetRewriter implements Closeable {
reader.setStreamPosition(chunk.getStartingPos());
DictionaryPage dictionaryPage = null;
- long readValues = 0;
+ long readValues = 0L;
+ long readRows = 0L;
Statistics<?> statistics = null;
boolean isColumnStatisticsMalformed = false;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
@@ -459,8 +462,9 @@ public class ParquetRewriter implements Closeable {
readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1
- + offsetIndex.getLastRowIndex(pageOrdinal, totalChunkValues)
+ + offsetIndex.getLastRowIndex(pageOrdinal, blockRowCount)
- offsetIndex.getFirstRowIndex(pageOrdinal);
+ readRows += rowCount;
writer.writeDataPage(
toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
@@ -524,6 +528,7 @@ public class ParquetRewriter implements Closeable {
"Detected mixed null page statistics and non-null page
statistics");
}
readValues += headerV2.getNum_values();
+ readRows += headerV2.getNum_rows();
writer.writeDataPageV2(
headerV2.getNum_rows(),
headerV2.getNum_nulls(),
@@ -544,6 +549,12 @@ public class ParquetRewriter implements Closeable {
}
}
+ Preconditions.checkState(
+ readRows == 0 || readRows == blockRowCount,
+ "Read row count: %s not match with block total row count: %s",
+ readRows,
+ blockRowCount);
+
if (isColumnStatisticsMalformed) {
// All the column statistics are invalid, so we need to overwrite the
column statistics
writer.invalidateStatistics(chunk.getStatistics());
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index aedf3b8c3..3a6548a68 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -882,6 +882,8 @@ public class ParquetRewriterTest {
BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(inBlockId);
BlockMetaData outBlockMetaData =
outMetaData.getBlocks().get(outBlockId);
+ assertEquals(inBlockMetaData.getRowCount(),
outBlockMetaData.getRowCount());
+
for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
if (!outFileColumnMapping.containsKey(j)) {
continue;
@@ -908,8 +910,8 @@ public class ParquetRewriterTest {
for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
assertEquals(inOffsetIndex.getFirstRowIndex(k),
outOffsetIndex.getFirstRowIndex(k));
assertEquals(
- inOffsetIndex.getLastRowIndex(k, inChunk.getValueCount()),
- outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount()));
+ inOffsetIndex.getLastRowIndex(k,
inBlockMetaData.getRowCount()),
+ outOffsetIndex.getLastRowIndex(k,
outBlockMetaData.getRowCount()));
assertEquals(inOffsetIndex.getOffset(k), (long)
inOffsets.get(k));
assertEquals(outOffsetIndex.getOffset(k), (long)
outOffsets.get(k));
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index 25f53cf49..809b4fd35 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -129,9 +129,9 @@ public class TestFileBuilder {
builder.withBloomFilterEnabled(columnPath, true);
}
- try (ParquetWriter writer = builder.build()) {
- for (int i = 0; i < fileContent.length; i++) {
- writer.write(fileContent[i]);
+ try (ParquetWriter<Group> writer = builder.build()) {
+ for (SimpleGroup simpleGroup : fileContent) {
+ writer.write(simpleGroup);
}
}
return new EncryptionTestFile(fileName, fileContent);
@@ -152,18 +152,7 @@ public class TestFileBuilder {
private void addValueToSimpleGroup(Group g, Type type) {
if (type.isPrimitive()) {
PrimitiveType primitiveType = (PrimitiveType) type;
- if (primitiveType.getPrimitiveTypeName().equals(INT32)) {
- g.add(type.getName(), getInt());
- } else if (primitiveType.getPrimitiveTypeName().equals(INT64)) {
- g.add(type.getName(), getLong());
- } else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
- g.add(type.getName(), getString());
- } else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
- g.add(type.getName(), getFloat());
- } else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
- g.add(type.getName(), getDouble());
- }
- // Only support 5 types now, more can be added later
+ addPrimitiveValueToSimpleGroup(g, primitiveType);
} else {
GroupType groupType = (GroupType) type;
Group parentGroup = g.addGroup(groupType.getName());
@@ -173,6 +162,34 @@ public class TestFileBuilder {
}
}
+ private void addPrimitiveValueToSimpleGroup(Group g, PrimitiveType
primitiveType) {
+ if (primitiveType.isRepetition(Type.Repetition.REPEATED)) {
+ int listSize = ThreadLocalRandom.current().nextInt(1, 10);
+ for (int i = 0; i < listSize; i++) {
+ addSinglePrimitiveValueToSimpleGroup(g, primitiveType);
+ }
+ } else {
+ addSinglePrimitiveValueToSimpleGroup(g, primitiveType);
+ }
+ }
+
+ private void addSinglePrimitiveValueToSimpleGroup(Group g, PrimitiveType
primitiveType) {
+ if (primitiveType.getPrimitiveTypeName().equals(INT32)) {
+ g.add(primitiveType.getName(), getInt());
+ } else if (primitiveType.getPrimitiveTypeName().equals(INT64)) {
+ g.add(primitiveType.getName(), getLong());
+ } else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
+ g.add(primitiveType.getName(), getString());
+ } else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
+ g.add(primitiveType.getName(), getFloat());
+ } else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
+ g.add(primitiveType.getName(), getDouble());
+ } else {
+ throw new UnsupportedOperationException("Unsupported type: " +
primitiveType);
+ }
+ // Only support 5 types now, more can be added later
+ }
+
private static long getInt() {
return ThreadLocalRandom.current().nextInt(10000);
}