This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 87ee1b349 PARQUET-2438: Fixes minMaxSize for BinaryColumnIndexBuilder
(#1279)
87ee1b349 is described below
commit 87ee1b3492615887fb9a9d58f25dd3d09a28b0ed
Author: Xianyang Liu <[email protected]>
AuthorDate: Tue Feb 27 09:43:06 2024 +0800
PARQUET-2438: Fixes minMaxSize for BinaryColumnIndexBuilder (#1279)
---
.../columnindex/BinaryColumnIndexBuilder.java | 7 +++
.../columnindex/BooleanColumnIndexBuilder.java | 5 +++
.../column/columnindex/ColumnIndexBuilder.java | 13 +++---
.../columnindex/DoubleColumnIndexBuilder.java | 5 +++
.../columnindex/FloatColumnIndexBuilder.java | 5 +++
.../column/columnindex/IntColumnIndexBuilder.java | 5 +++
.../column/columnindex/LongColumnIndexBuilder.java | 5 +++
.../column/columnindex/TestColumnIndexBuilder.java | 47 +++++++++++++++++++
.../apache/parquet/hadoop/ParquetFileWriter.java | 5 ++-
.../parquet/hadoop/TestDataPageChecksums.java | 1 +
.../parquet/hadoop/TestParquetFileWriter.java | 52 ++++++++++++++++++++--
11 files changed, 137 insertions(+), 13 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
index 96914a417..1c546b516 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -136,4 +136,11 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
int sizeOf(Object value) {
return ((Binary) value).length();
}
+
+ @Override
+ public long getMinMaxSize() {
+ long minSizesSum = minValues.stream().mapToLong(Binary::length).sum();
+ long maxSizesSum = maxValues.stream().mapToLong(Binary::length).sum();
+ return minSizesSum + maxSizesSum;
+ }
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
index 7c07253bd..28436a182 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BooleanColumnIndexBuilder.java
@@ -129,4 +129,9 @@ class BooleanColumnIndexBuilder extends ColumnIndexBuilder {
int sizeOf(Object value) {
return 1;
}
+
+ @Override
+ public long getMinMaxSize() {
+ return minValues.size() + maxValues.size();
+ }
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index 7dd19a87c..30b3f6fda 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -446,12 +446,16 @@ public abstract class ColumnIndexBuilder {
int sizeOf(Object value) {
return 0;
}
+
+ @Override
+ public long getMinMaxSize() {
+ return 0;
+ }
};
private PrimitiveType type;
private final BooleanList nullPages = new BooleanArrayList();
private final LongList nullCounts = new LongArrayList();
- private long minMaxSize;
private final IntList pageIndexes = new IntArrayList();
private int nextPageIndex;
@@ -537,8 +541,6 @@ public abstract class ColumnIndexBuilder {
Object max = stats.genericGetMax();
addMinMax(min, max);
pageIndexes.add(nextPageIndex);
- minMaxSize += sizeOf(min);
- minMaxSize += sizeOf(max);
} else {
nullPages.add(true);
}
@@ -576,8 +578,6 @@ public abstract class ColumnIndexBuilder {
ByteBuffer max = maxValues.get(i);
addMinMaxFromBytes(min, max);
pageIndexes.add(i);
- minMaxSize += min.remaining();
- minMaxSize += max.remaining();
}
}
}
@@ -651,7 +651,6 @@ public abstract class ColumnIndexBuilder {
nullPages.clear();
nullCounts.clear();
clearMinMax();
- minMaxSize = 0;
nextPageIndex = 0;
pageIndexes.clear();
}
@@ -673,6 +672,6 @@ public abstract class ColumnIndexBuilder {
* @return the sum of size in bytes of the min/max values added so far to
this builder
*/
public long getMinMaxSize() {
- return minMaxSize;
+ throw new UnsupportedOperationException("Not implemented");
}
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
index 27bb19e25..5d5d54aa7 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java
@@ -150,4 +150,9 @@ class DoubleColumnIndexBuilder extends ColumnIndexBuilder {
int sizeOf(Object value) {
return Double.BYTES;
}
+
+ @Override
+ public long getMinMaxSize() {
+ return (long) minValues.size() * Double.BYTES + (long) maxValues.size() *
Double.BYTES;
+ }
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
index d614cadd2..be66f85d1 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java
@@ -150,4 +150,9 @@ class FloatColumnIndexBuilder extends ColumnIndexBuilder {
int sizeOf(Object value) {
return Float.BYTES;
}
+
+ @Override
+ public long getMinMaxSize() {
+ return (long) minValues.size() * Float.BYTES + (long) maxValues.size() *
Float.BYTES;
+ }
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
index b5c540b2b..a9048d8b4 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IntColumnIndexBuilder.java
@@ -131,4 +131,9 @@ class IntColumnIndexBuilder extends ColumnIndexBuilder {
int sizeOf(Object value) {
return Integer.BYTES;
}
+
+ @Override
+ public long getMinMaxSize() {
+ return (long) minValues.size() * Integer.BYTES + (long) maxValues.size() *
Integer.BYTES;
+ }
}
diff --git
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
index 65f78c0b5..f5a382ebd 100644
---
a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
+++
b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/LongColumnIndexBuilder.java
@@ -131,4 +131,9 @@ class LongColumnIndexBuilder extends ColumnIndexBuilder {
int sizeOf(Object value) {
return Long.BYTES;
}
+
+ @Override
+ public long getMinMaxSize() {
+ return (long) minValues.size() * Long.BYTES + (long) maxValues.size() *
Long.BYTES;
+ }
}
diff --git
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 7e06608f6..0631300fc 100644
---
a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++
b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -579,6 +579,53 @@ public class TestColumnIndexBuilder {
columnIndex, invert(userDefined(col, BinaryUtf8StartsWithB.class)), 0,
1, 2, 3, 4, 5, 6, 7);
}
+ @Test
+ public void testBinaryWithTruncate() {
+ PrimitiveType type =
Types.required(BINARY).as(UTF8).named("test_binary_utf8");
+ int truncateLen = 5;
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type,
truncateLen);
+ assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
+ assertNull(builder.build());
+
+ StatsBuilder sb = new StatsBuilder();
+ builder.add(sb.stats(type, null, null));
+ builder.add(sb.stats(type, stringBinary("Jeltz"),
stringBinary("Slartibartfast"), null, null));
+ builder.add(sb.stats(type, null, null, null, null, null));
+ builder.add(sb.stats(type, null, null));
+ builder.add(sb.stats(type, stringBinary("Beeblebrox"),
stringBinary("Prefect")));
+ builder.add(sb.stats(type, stringBinary("Dent"), stringBinary("Trilian"),
null));
+ builder.add(sb.stats(type, stringBinary("Beeblebrox")));
+ builder.add(sb.stats(type, null, null));
+ assertEquals(8, builder.getPageCount());
+ assertEquals(39, builder.getMinMaxSize());
+ ColumnIndex columnIndex = builder.build();
+ assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
+ assertCorrectNullCounts(columnIndex, 2, 2, 5, 2, 0, 1, 0, 2);
+ assertCorrectNullPages(columnIndex, true, false, true, true, false, false,
false, true);
+
+ BinaryTruncator truncator = BinaryTruncator.getTruncator(type);
+ assertCorrectValues(
+ columnIndex.getMaxValues(),
+ null,
+ truncator.truncateMax(stringBinary("Slartibartfast"), truncateLen),
+ null,
+ null,
+ truncator.truncateMax(stringBinary("Prefect"), truncateLen),
+ truncator.truncateMax(stringBinary("Trilian"), truncateLen),
+ truncator.truncateMax(stringBinary("Beeblebrox"), truncateLen),
+ null);
+ assertCorrectValues(
+ columnIndex.getMinValues(),
+ null,
+ truncator.truncateMin(stringBinary("Jeltz"), truncateLen),
+ null,
+ null,
+ truncator.truncateMin(stringBinary("Beeblebrox"), truncateLen),
+ truncator.truncateMin(stringBinary("Dent"), truncateLen),
+ truncator.truncateMin(stringBinary("Beeblebrox"), truncateLen),
+ null);
+ }
+
@Test
public void testStaticBuildBinary() {
ColumnIndex columnIndex = ColumnIndexBuilder.build(
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 9867964de..abc408779 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -524,6 +524,7 @@ public class ParquetFileWriter implements AutoCloseable {
* @param file the file to write to
* @param rowAndBlockSize the row group size
* @param maxPaddingSize the maximum padding
+ * @param columnIndexTruncateLength the length which the min/max values in
column indexes tried to be truncated to
* @param allocator allocator to potentially allocate {@link
java.nio.ByteBuffer} objects
* @throws IOException if the file can not be created
*/
@@ -533,6 +534,7 @@ public class ParquetFileWriter implements AutoCloseable {
Path file,
long rowAndBlockSize,
int maxPaddingSize,
+ int columnIndexTruncateLength,
ByteBufferAllocator allocator)
throws IOException {
FileSystem fs = file.getFileSystem(configuration);
@@ -540,8 +542,7 @@ public class ParquetFileWriter implements AutoCloseable {
this.alignment = PaddingAlignment.get(rowAndBlockSize, rowAndBlockSize,
maxPaddingSize);
this.out = HadoopStreams.wrap(fs.create(file, true, 8192,
fs.getDefaultReplication(file), rowAndBlockSize));
this.encodingStatsBuilder = new EncodingStats.Builder();
- // no truncation is needed for testing
- this.columnIndexTruncateLength = Integer.MAX_VALUE;
+ this.columnIndexTruncateLength = columnIndexTruncateLength;
this.pageWriteChecksumEnabled =
ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
this.crcAllocator = pageWriteChecksumEnabled
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
index 3ffa3806c..79a81a5e9 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
@@ -136,6 +136,7 @@ public class TestDataPageChecksums {
path,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.MAX_PADDING_SIZE_DEFAULT,
+ Integer.MAX_VALUE,
allocator);
writer.start();
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 8b85b181b..7d31bec00 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -36,6 +36,7 @@ import static
org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -97,6 +98,7 @@ import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.internal.column.columnindex.BinaryTruncator;
import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
@@ -180,7 +182,7 @@ public class TestParquetFileWriter {
private ParquetFileWriter createWriter(
Configuration conf, MessageType schema, Path path, long blockSize, int
maxPaddingSize) throws IOException {
- return new ParquetFileWriter(conf, schema, path, blockSize,
maxPaddingSize, allocator);
+ return new ParquetFileWriter(conf, schema, path, blockSize,
maxPaddingSize, Integer.MAX_VALUE, allocator);
}
@Test
@@ -1206,13 +1208,27 @@ public class TestParquetFileWriter {
@Test
public void testColumnIndexWriteRead() throws Exception {
+ // Don't truncate
+ testColumnIndexWriteRead(Integer.MAX_VALUE);
+ // Truncate to DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH
+
testColumnIndexWriteRead(ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+ }
+
+ private void testColumnIndexWriteRead(int columnIndexTruncateLen) throws
Exception {
File testFile = temp.newFile();
testFile.delete();
Path path = new Path(testFile.toURI());
Configuration configuration = new Configuration();
- ParquetFileWriter w = createWriter(configuration, SCHEMA, path);
+ ParquetFileWriter w = new ParquetFileWriter(
+ configuration,
+ SCHEMA,
+ path,
+ DEFAULT_BLOCK_SIZE,
+ MAX_PADDING_SIZE_DEFAULT,
+ columnIndexTruncateLen,
+ allocator);
w.start();
w.startBlock(4);
w.startColumn(C1, 7, CODEC);
@@ -1336,8 +1352,36 @@ public class TestParquetFileWriter {
assertEquals(1, offsetIndex.getFirstRowIndex(1));
assertEquals(3, offsetIndex.getFirstRowIndex(2));
- assertNull(reader.readColumnIndex(
- footer.getBlocks().get(2).getColumns().get(0)));
+ if (columnIndexTruncateLen == Integer.MAX_VALUE) {
+ assertNull(reader.readColumnIndex(
+ footer.getBlocks().get(2).getColumns().get(0)));
+ } else {
+ blockMeta = footer.getBlocks().get(2);
+ assertNotNull(reader.readColumnIndex(blockMeta.getColumns().get(0)));
+ columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(0));
+ assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder());
+ assertTrue(Arrays.asList(0l).equals(columnIndex.getNullCounts()));
+ assertTrue(Arrays.asList(false).equals(columnIndex.getNullPages()));
+ minValues = columnIndex.getMinValues();
+ assertEquals(1, minValues.size());
+ maxValues = columnIndex.getMaxValues();
+ assertEquals(1, maxValues.size());
+
+ BinaryTruncator truncator =
+
BinaryTruncator.getTruncator(SCHEMA.getType(PATH1).asPrimitiveType());
+ assertEquals(
+ new String(new byte[1], StandardCharsets.UTF_8),
+ new String(minValues.get(0).array(), StandardCharsets.UTF_8));
+ byte[] truncatedMaxValue = truncator
+ .truncateMax(
+ Binary.fromConstantByteArray(new byte[(int) MAX_STATS_SIZE]),
columnIndexTruncateLen)
+ .getBytes();
+ assertEquals(
+ new String(truncatedMaxValue, StandardCharsets.UTF_8),
+ new String(maxValues.get(0).array(), StandardCharsets.UTF_8));
+
+ assertNull(reader.readColumnIndex(blockMeta.getColumns().get(1)));
+ }
}
}