This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new de08d8d49 PARQUET-2348: Recompression/Re-encrypt should rewrite 
bloomfilter (#1143)
de08d8d49 is described below

commit de08d8d49d6e566928bfe2846fe3f757951b78ab
Author: Xianyang Liu <[email protected]>
AuthorDate: Thu Oct 12 09:16:04 2023 +0800

    PARQUET-2348: Recompression/Re-encrypt should rewrite bloomfilter (#1143)
---
 .../apache/parquet/hadoop/ParquetFileWriter.java   |   4 +-
 .../parquet/hadoop/rewrite/ParquetRewriter.java    |   4 +
 .../hadoop/rewrite/ParquetRewriterTest.java        | 114 ++++++++++++++++++++-
 .../parquet/hadoop/util/TestFileBuilder.java       |  12 +++
 4 files changed, 131 insertions(+), 3 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 80b9907a2..a71121e11 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -738,12 +738,12 @@ public class ParquetFileWriter {
   }
 
   /**
-   * Add a Bloom filter that will be written out. This is only used in unit 
test.
+   * Add a Bloom filter that will be written out.
    *
    * @param column the column name
    * @param bloomFilter the bloom filter of column values
    */
-  void addBloomFilter(String column, BloomFilter bloomFilter)  {
+  public void addBloomFilter(String column, BloomFilter bloomFilter)  {
     currentBloomFilters.put(column , bloomFilter);
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 035748579..d336aaf57 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -366,6 +366,10 @@ public class ParquetRewriter implements Closeable {
 
     ColumnIndex columnIndex = reader.readColumnIndex(chunk);
     OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+    BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+    if (bloomFilter != null) {
+      writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
+    }
 
     reader.setStreamPosition(chunk.getStartingPos());
     DictionaryPage dictionaryPage = null;
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index a08633d15..62d0a28ca 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -26,9 +26,11 @@ import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.SimpleGroup;
 import org.apache.parquet.format.DataPageHeader;
@@ -41,6 +43,7 @@ import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -538,11 +541,74 @@ public class ParquetRewriterTest {
     testPruneSingleColumnTranslateCodec(inputPaths);
   }
 
+  @Test
+  public void testPruneSingleColumnTranslateCodecAndEnableBloomFilter() throws 
Exception {
+    testSingleInputFileSetupWithBloomFilter("GZIP", "DocId");
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testPruneSingleColumnTranslateCodec(inputPaths);
+
+    // Verify bloom filters
+    Map<ColumnPath, List<BloomFilter>> inputBloomFilters = 
allInputBloomFilters(null);
+    Map<ColumnPath, List<BloomFilter>> outputBloomFilters = 
allOutputBloomFilters(null);
+    assertEquals(inputBloomFilters, outputBloomFilters);
+  }
+
+  @Test
+  public void testPruneNullifyTranslateCodecAndEnableBloomFilter() throws 
Exception {
+    testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testPruneNullifyTranslateCodec(inputPaths);
+
+    // Verify bloom filters
+    Map<ColumnPath, List<BloomFilter>> inputBloomFilters = 
allInputBloomFilters(null);
+    assertEquals(inputBloomFilters.size(), 2);
+    
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("Links.Forward")));
+    
assertTrue(inputBloomFilters.containsKey(ColumnPath.fromDotString("DocId")));
+
+    Map<ColumnPath, List<BloomFilter>> outputBloomFilters = 
allOutputBloomFilters(null);
+    assertEquals(outputBloomFilters.size(), 1);
+    
assertTrue(outputBloomFilters.containsKey(ColumnPath.fromDotString("DocId")));
+
+    inputBloomFilters.remove(ColumnPath.fromDotString("Links.Forward"));
+    assertEquals(inputBloomFilters, outputBloomFilters);
+  }
+
+  @Test
+  public void testPruneEncryptTranslateCodecAndEnableBloomFilter() throws 
Exception {
+    testSingleInputFileSetupWithBloomFilter("GZIP", "DocId", "Links.Forward");
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testPruneEncryptTranslateCodec(inputPaths);
+
+    // Verify bloom filters
+    Map<ColumnPath, List<BloomFilter>> inputBloomFilters = 
allInputBloomFilters(null);
+
+    // Cannot read without FileDecryptionProperties
+    assertThrows(ParquetCryptoRuntimeException.class, () -> 
allOutputBloomFilters(null));
+
+    FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
+    Map<ColumnPath, List<BloomFilter>> outputBloomFilters = 
allOutputBloomFilters(fileDecryptionProperties);
+    assertEquals(inputBloomFilters, outputBloomFilters);
+  }
+
   private void testSingleInputFileSetup(String compression) throws IOException 
{
     testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
   }
 
-  private void testSingleInputFileSetup(String compression, long rowGroupSize) 
throws IOException {
+  private void testSingleInputFileSetupWithBloomFilter(
+    String compression,
+    String... bloomFilterEnabledColumns) throws IOException {
+    testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE, 
bloomFilterEnabledColumns);
+  }
+
+  private void testSingleInputFileSetup(String compression,
+                                        long rowGroupSize,
+                                        String... bloomFilterEnabledColumns) 
throws IOException {
     MessageType schema = createSchema();
     inputFiles = Lists.newArrayList();
     inputFiles.add(new TestFileBuilder(conf, schema)
@@ -550,6 +616,7 @@ public class ParquetRewriterTest {
       .withCodec(compression)
       .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
       .withRowGroupSize(rowGroupSize)
+      .withBloomFilterEnabled(bloomFilterEnabledColumns)
       .build());
   }
 
@@ -807,4 +874,49 @@ public class ParquetRewriterTest {
     assertEquals(inputRowCounts, outputRowCounts);
   }
 
+  private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters(
+    FileDecryptionProperties fileDecryptionProperties) throws Exception {
+    Map<ColumnPath, List<BloomFilter>> inputBloomFilters = new HashMap<>();
+    for (EncryptionTestFile inputFile : inputFiles) {
+      Map<ColumnPath, List<BloomFilter>> bloomFilters =
+        allBloomFilters(inputFile.getFileName(), fileDecryptionProperties);
+      for (Map.Entry<ColumnPath, List<BloomFilter>> entry : 
bloomFilters.entrySet()) {
+        List<BloomFilter> bloomFilterList = 
inputBloomFilters.getOrDefault(entry.getKey(), new ArrayList<>());
+        bloomFilterList.addAll(entry.getValue());
+        inputBloomFilters.put(entry.getKey(), bloomFilterList);
+      }
+    }
+
+    return inputBloomFilters;
+  }
+
+  private Map<ColumnPath, List<BloomFilter>> allOutputBloomFilters(
+    FileDecryptionProperties fileDecryptionProperties) throws Exception {
+    return allBloomFilters(outputFile, fileDecryptionProperties);
+  }
+
+  private Map<ColumnPath, List<BloomFilter>> allBloomFilters(
+    String path, FileDecryptionProperties fileDecryptionProperties) throws 
Exception {
+    Map<ColumnPath, List<BloomFilter>> allBloomFilters = new HashMap<>();
+    ParquetReadOptions readOptions = ParquetReadOptions.builder()
+      .withDecryption(fileDecryptionProperties)
+      .build();
+    InputFile inputFile = HadoopInputFile.fromPath(new Path(path), conf);
+    try (TransParquetFileReader reader = new TransParquetFileReader(inputFile, 
readOptions)) {
+      ParquetMetadata metadata = reader.getFooter();
+      for (BlockMetaData blockMetaData: metadata.getBlocks()) {
+        for (ColumnChunkMetaData columnChunkMetaData : 
blockMetaData.getColumns()) {
+          BloomFilter bloomFilter = 
reader.readBloomFilter(columnChunkMetaData);
+          if (bloomFilter != null) {
+            List<BloomFilter> bloomFilterList =
+              allBloomFilters.getOrDefault(columnChunkMetaData.getPath(), new 
ArrayList<>());
+            bloomFilterList.add(bloomFilter);
+            allBloomFilters.put(columnChunkMetaData.getPath(), 
bloomFilterList);
+          }
+        }
+      }
+    }
+
+    return allBloomFilters;
+  }
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index bea744cf5..951899bb3 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -52,6 +52,7 @@ public class TestFileBuilder
     private ParquetCipher cipher = ParquetCipher.AES_GCM_V1;
     private Boolean footerEncryption = false;
     private long rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
+    private String[] bloomFilterEnabled = {};
 
     public TestFileBuilder(Configuration conf, MessageType schema)
     {
@@ -114,6 +115,12 @@ public class TestFileBuilder
         return this;
     }
 
+    public TestFileBuilder withBloomFilterEnabled(String[] bloomFilterEnabled)
+    {
+        this.bloomFilterEnabled = bloomFilterEnabled;
+        return this;
+    }
+
     public EncryptionTestFile build()
             throws IOException
     {
@@ -129,6 +136,11 @@ public class TestFileBuilder
                 .withRowGroupSize(rowGroupSize)
                 .withEncryption(encryptionProperties)
                 .withCompressionCodec(CompressionCodecName.valueOf(codec));
+
+        for (String columnPath: bloomFilterEnabled) {
+          builder.withBloomFilterEnabled(columnPath, true);
+        }
+
         try (ParquetWriter writer = builder.build()) {
             for (int i = 0; i < fileContent.length; i++) {
                 writer.write(fileContent[i]);

Reply via email to