This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new de08d8d49 PARQUET-2348: Recompression/Re-encrypt should rewrite
bloomfilter (#1143)
de08d8d49 is described below
commit de08d8d49d6e566928bfe2846fe3f757951b78ab
Author: Xianyang Liu <[email protected]>
AuthorDate: Thu Oct 12 09:16:04 2023 +0800
PARQUET-2348: Recompression/Re-encrypt should rewrite bloomfilter (#1143)
---
.../apache/parquet/hadoop/ParquetFileWriter.java | 4 +-
.../parquet/hadoop/rewrite/ParquetRewriter.java | 4 +
.../hadoop/rewrite/ParquetRewriterTest.java | 114 ++++++++++++++++++++-
.../parquet/hadoop/util/TestFileBuilder.java | 12 +++
4 files changed, 131 insertions(+), 3 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 80b9907a2..a71121e11 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -738,12 +738,12 @@ public class ParquetFileWriter {
}
/**
- * Add a Bloom filter that will be written out. This is only used in unit
test.
+ * Add a Bloom filter that will be written out.
*
* @param column the column name
* @param bloomFilter the bloom filter of column values
*/
- void addBloomFilter(String column, BloomFilter bloomFilter) {
+ public void addBloomFilter(String column, BloomFilter bloomFilter) {
currentBloomFilters.put(column , bloomFilter);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 035748579..d336aaf57 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -366,6 +366,10 @@ public class ParquetRewriter implements Closeable {
ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+ BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+ if (bloomFilter != null) {
+ writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
+ }
reader.setStreamPosition(chunk.getStartingPos());
DictionaryPage dictionaryPage = null;
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index a08633d15..62d0a28ca 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -26,9 +26,11 @@ import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Version;
import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
import org.apache.parquet.format.DataPageHeader;
@@ -41,6 +43,7 @@ import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -538,11 +541,74 @@ public class ParquetRewriterTest {
testPruneSingleColumnTranslateCodec(inputPaths);
}
+ @Test
+ public void testPruneSingleColumnTranslateCodecAndEnableBloomFilter() throws
Exception {
+ testSingleInputFileSetupWithBloomFilter("GZIP", "DocId");
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testPruneSingleColumnTranslateCodec(inputPaths);
+
+ // Verify bloom filters
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters(null);
+ Map<ColumnPath, List<BloomFilter>> outputBloomFilters =
allOutputBloomFilters(null);
+ assertEquals(inputBloomFilters, outputBloomFilters);
+ }
+
+ @Test
+ public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws
Exception {
+ testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testPruneNullifyTranslateCodec(inputPaths);
+
+ // Verify bloom filters
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters(null);
+ assertEquals(inputBloomFilters.size(), 2);
+
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("Links.Forward")));
+
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("DocId")));
+
+ Map<ColumnPath, List<BloomFilter>> outputBloomFilters =
allOutputBloomFilters(null);
+ assertEquals(outputBloomFilters.size(), 1);
+
assertTrue(outputBloomFilters.containsKey(ColumnPath.fromDotString("DocId")));
+
+ inputBloomFilters.remove(ColumnPath.fromDotString("Links.Forward"));
+ assertEquals(inputBloomFilters, outputBloomFilters);
+ }
+
+ @Test
+ public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws
Exception {
+ testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testPruneEncryptTranslateCodec(inputPaths);
+
+ // Verify bloom filters
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters(null);
+
+ // Cannot read without FileDecryptionProperties
+ assertThrows(ParquetCryptoRuntimeException.class, () ->
allOutputBloomFilters(null));
+
+ FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
+ Map<ColumnPath, List<BloomFilter>> outputBloomFilters =
allOutputBloomFilters(fileDecryptionProperties);
+ assertEquals(inputBloomFilters, outputBloomFilters);
+ }
+
private void testSingleInputFileSetup(String compression) throws IOException
{
testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
}
- private void testSingleInputFileSetup(String compression, long rowGroupSize)
throws IOException {
+ private void testSingleInputFileSetupWithBloomFilter(
+ String compression,
+ String... bloomFilterEnabledColumns) throws IOException {
+ testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE,
bloomFilterEnabledColumns);
+ }
+
+ private void testSingleInputFileSetup(String compression,
+ long rowGroupSize,
+ String... bloomFilterEnabledColumns)
throws IOException {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
@@ -550,6 +616,7 @@ public class ParquetRewriterTest {
.withCodec(compression)
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withRowGroupSize(rowGroupSize)
+ .withBloomFilterEnabled(bloomFilterEnabledColumns)
.build());
}
@@ -807,4 +874,49 @@ public class ParquetRewriterTest {
assertEquals(inputRowCounts, outputRowCounts);
}
+ private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters(
+ FileDecryptionProperties fileDecryptionProperties) throws Exception {
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters = new HashMap<>();
+ for (EncryptionTestFile inputFile : inputFiles) {
+ Map<ColumnPath, List<BloomFilter>> bloomFilters =
+ allBloomFilters(inputFile.getFileName(), fileDecryptionProperties);
+ for (Map.Entry<ColumnPath, List<BloomFilter>> entry :
bloomFilters.entrySet()) {
+ List<BloomFilter> bloomFilterList =
inputBloomFilters.getOrDefault(entry.getKey(), new ArrayList<>());
+ bloomFilterList.addAll(entry.getValue());
+ inputBloomFilters.put(entry.getKey(), bloomFilterList);
+ }
+ }
+
+ return inputBloomFilters;
+ }
+
+ private Map<ColumnPath, List<BloomFilter>> allOutputBloomFilters(
+ FileDecryptionProperties fileDecryptionProperties) throws Exception {
+ return allBloomFilters(outputFile, fileDecryptionProperties);
+ }
+
+ private Map<ColumnPath, List<BloomFilter>> allBloomFilters(
+ String path, FileDecryptionProperties fileDecryptionProperties) throws
Exception {
+ Map<ColumnPath, List<BloomFilter>> allBloomFilters = new HashMap<>();
+ ParquetReadOptions readOptions = ParquetReadOptions.builder()
+ .withDecryption(fileDecryptionProperties)
+ .build();
+ InputFile inputFile = HadoopInputFile.fromPath(new Path(path), conf);
+ try (TransParquetFileReader reader = new TransParquetFileReader(inputFile,
readOptions)) {
+ ParquetMetadata metadata = reader.getFooter();
+ for (BlockMetaData blockMetaData: metadata.getBlocks()) {
+ for (ColumnChunkMetaData columnChunkMetaData :
blockMetaData.getColumns()) {
+ BloomFilter bloomFilter =
reader.readBloomFilter(columnChunkMetaData);
+ if (bloomFilter != null) {
+ List<BloomFilter> bloomFilterList =
+ allBloomFilters.getOrDefault(columnChunkMetaData.getPath(), new
ArrayList<>());
+ bloomFilterList.add(bloomFilter);
+ allBloomFilters.put(columnChunkMetaData.getPath(),
bloomFilterList);
+ }
+ }
+ }
+ }
+
+ return allBloomFilters;
+ }
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index bea744cf5..951899bb3 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -52,6 +52,7 @@ public class TestFileBuilder
private ParquetCipher cipher = ParquetCipher.AES_GCM_V1;
private Boolean footerEncryption = false;
private long rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
+ private String[] bloomFilterEnabled = {};
public TestFileBuilder(Configuration conf, MessageType schema)
{
@@ -114,6 +115,12 @@ public class TestFileBuilder
return this;
}
+ public TestFileBuilder withBloomFilterEnabled(String[] bloomFilterEnabled)
+ {
+ this.bloomFilterEnabled = bloomFilterEnabled;
+ return this;
+ }
+
public EncryptionTestFile build()
throws IOException
{
@@ -129,6 +136,11 @@ public class TestFileBuilder
.withRowGroupSize(rowGroupSize)
.withEncryption(encryptionProperties)
.withCompressionCodec(CompressionCodecName.valueOf(codec));
+
+ for (String columnPath: bloomFilterEnabled) {
+ builder.withBloomFilterEnabled(columnPath, true);
+ }
+
try (ParquetWriter writer = builder.build()) {
for (int i = 0; i < fileContent.length; i++) {
writer.write(fileContent[i]);