This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new eface4db2 PARQUET-2373: Improve performance with bloom_filter_length 
(#1184)
eface4db2 is described below

commit eface4db2e961a0174d4d1cd809134262d8c4c76
Author: Jiashen zhang <[email protected]>
AuthorDate: Sat Jan 27 08:04:33 2024 -0800

    PARQUET-2373: Improve performance with bloom_filter_length (#1184)
---
 .../format/converter/ParquetMetadataConverter.java |   7 +
 .../apache/parquet/hadoop/ParquetFileReader.java   |  22 +-
 .../apache/parquet/hadoop/ParquetFileWriter.java   |   3 +
 .../hadoop/metadata/ColumnChunkMetaData.java       |  22 +-
 .../converter/TestParquetMetadataConverter.java    |  25 +++
 .../parquet/hadoop/TestInteropBloomFilter.java     | 225 +++++++++++++++++++++
 6 files changed, 298 insertions(+), 6 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 47ab0649d..350bc3794 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -563,6 +563,10 @@ public class ParquetMetadataConverter {
       if (bloomFilterOffset >= 0) {
         metaData.setBloom_filter_offset(bloomFilterOffset);
       }
+      int bloomFilterLength = columnMetaData.getBloomFilterLength();
+      if (bloomFilterLength >= 0) {
+        metaData.setBloom_filter_length(bloomFilterLength);
+      }
       if (columnMetaData.getStatistics() != null
           && !columnMetaData.getStatistics().isEmpty()) {
         metaData.setStatistics(
@@ -1689,6 +1693,9 @@ public class ParquetMetadataConverter {
             if (metaData.isSetBloom_filter_offset()) {
               column.setBloomFilterOffset(metaData.getBloom_filter_offset());
             }
+            if (metaData.isSetBloom_filter_length()) {
+              column.setBloomFilterLength(metaData.getBloom_filter_length());
+            }
           } else { // column encrypted with column key
             // Metadata will be decrypted later, if this column is accessed
             EncryptionWithColumnKey columnKeyStruct = 
cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY();
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 91be68ed6..19403c329 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -32,6 +32,7 @@ import static 
org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
 import static 
org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
 import static 
org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
 
+import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -1442,11 +1443,24 @@ public class ParquetFileReader implements Closeable {
       }
     }
 
-    // Read Bloom filter data header.
+    // Seek to Bloom filter offset.
     f.seek(bloomFilterOffset);
+
+    // Read Bloom filter length.
+    int bloomFilterLength = meta.getBloomFilterLength();
+
+    // If it is set, read Bloom filter header and bitset together.
+    // Otherwise, read Bloom filter header first and then bitset.
+    InputStream in = f;
+    if (bloomFilterLength > 0) {
+      byte[] headerAndBitSet = new byte[bloomFilterLength];
+      f.readFully(headerAndBitSet);
+      in = new ByteArrayInputStream(headerAndBitSet);
+    }
+
     BloomFilterHeader bloomFilterHeader;
     try {
-      bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor, 
bloomFilterHeaderAAD);
+      bloomFilterHeader = Util.readBloomFilterHeader(in, bloomFilterDecryptor, 
bloomFilterHeaderAAD);
     } catch (IOException e) {
       LOG.warn("read no bloom filter");
       return null;
@@ -1472,9 +1486,9 @@ public class ParquetFileReader implements Closeable {
     byte[] bitset;
     if (null == bloomFilterDecryptor) {
       bitset = new byte[numBytes];
-      f.readFully(bitset);
+      in.read(bitset);
     } else {
-      bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
+      bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
       if (bitset.length != numBytes) {
         throw new ParquetCryptoRuntimeException("Wrong length of decrypted 
bloom filter bitset");
       }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 0ade8956a..57700d494 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1620,6 +1620,9 @@ public class ParquetFileWriter {
           serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, 
bloomFilterBitsetAAD);
         }
         out.write(serializedBitset);
+
+        int length = (int) (out.getPos() - offset);
+        column.setBloomFilterLength(length);
       }
     }
   }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 966367dae..62564c797 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -260,6 +260,7 @@ public abstract class ColumnChunkMetaData {
   private IndexReference offsetIndexReference;
 
   private long bloomFilterOffset = -1;
+  private int bloomFilterLength = -1;
 
   protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
     this(null, columnChunkProperties);
@@ -383,8 +384,14 @@ public abstract class ColumnChunkMetaData {
   }
 
   /**
-   * Method should be considered private
-   *
+   * @param bloomFilterLength
+   *          the reference to the Bloom filter
+   */
+  public void setBloomFilterLength(int bloomFilterLength) {
+    this.bloomFilterLength = bloomFilterLength;
+  }
+
+  /**
    * @return the offset to the Bloom filter or {@code -1} if there is no bloom 
filter for this column chunk
    */
   public long getBloomFilterOffset() {
@@ -392,6 +399,14 @@ public abstract class ColumnChunkMetaData {
     return bloomFilterOffset;
   }
 
+  /**
+   * @return the length to the Bloom filter or {@code -1} if there is no bloom 
filter length for this column chunk
+   */
+  public int getBloomFilterLength() {
+    decryptIfNeeded();
+    return bloomFilterLength;
+  }
+
   /**
    * @return all the encodings used in this column
    */
@@ -693,6 +708,9 @@ class EncryptedColumnChunkMetaData extends 
ColumnChunkMetaData {
     if (metaData.isSetBloom_filter_offset()) {
       setBloomFilterOffset(metaData.getBloom_filter_offset());
     }
+    if (metaData.isSetBloom_filter_length()) {
+      setBloomFilterLength(metaData.getBloom_filter_length());
+    }
   }
 
   @Override
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 3be9f6fde..d03b55172 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -270,6 +270,31 @@ public class TestParquetMetadataConverter {
         1234, 
convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
   }
 
+  @Test
+  public void testBloomFilterLength() throws IOException {
+    ParquetMetadata origMetaData = createParquetMetaData(null, Encoding.PLAIN);
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+
+    // Without bloom filter length
+    FileMetaData footer = converter.toParquetMetadata(1, origMetaData);
+    assertFalse(
+        
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
+    ParquetMetadata convertedMetaData = converter.fromParquetMetadata(footer);
+    
assertTrue(convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength()
 < 0);
+
+    // With bloom filter length
+    
origMetaData.getBlocks().get(0).getColumns().get(0).setBloomFilterLength(1024);
+    footer = converter.toParquetMetadata(1, origMetaData);
+    assertTrue(
+        
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
+    assertEquals(
+        1024,
+        
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().getBloom_filter_length());
+    convertedMetaData = converter.fromParquetMetadata(footer);
+    assertEquals(
+        1024, 
convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength());
+  }
+
   @Test
   public void testLogicalTypesBackwardCompatibleWithConvertedTypes() {
     ParquetMetadataConverter parquetMetadataConverter = new 
ParquetMetadataConverter();
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java
new file mode 100644
index 000000000..5b7ae6a85
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestInteropBloomFilter {
+
+  // The link includes a reference to a specific commit. To take a newer 
version - update this link.
+  private static final String PARQUET_TESTING_REPO = 
"https://github.com/apache/parquet-testing/raw/d69d979/data/";;
+  private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+  // parquet-testing: https://github.com/apache/parquet-testing/pull/22
+  private static String DATA_INDEX_BLOOM_FILE = 
"data_index_bloom_encoding_stats.parquet";
+  // parquet-testing: https://github.com/apache/parquet-testing/pull/43
+  private static String DATA_INDEX_BLOOM_WITH_LENGTH_FILE = 
"data_index_bloom_encoding_with_length.parquet";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestInteropBloomFilter.class);
+  private OkHttpClient httpClient = new OkHttpClient();
+
+  @Test
+  public void testReadDataIndexBloomParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testReadDataIndexBloomParquetFiles {} ========", 
rootPath);
+
+    Path filePath = downloadInterOpFiles(rootPath, DATA_INDEX_BLOOM_FILE, 
httpClient);
+
+    int expectedRowCount = 14;
+    String[] expectedValues = new String[] {
+      "Hello",
+      "This is",
+      "a",
+      "test",
+      "How",
+      "are you",
+      "doing ",
+      "today",
+      "the quick",
+      "brown fox",
+      "jumps",
+      "over",
+      "the lazy",
+      "dog"
+    };
+
+    String[] unexpectedValues = new String[] {"b", "c", "d"};
+
+    try (ParquetReader<Group> reader =
+        ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
+      for (int i = 0; i < expectedRowCount; ++i) {
+        Group group = reader.read();
+        if (group == null) {
+          fail("Should not reach end of file");
+        }
+        assertEquals(expectedValues[i], group.getString(0, 0));
+      }
+    }
+
+    ParquetFileReader reader = new ParquetFileReader(
+        HadoopInputFile.fromPath(filePath, new Configuration()),
+        ParquetReadOptions.builder().build());
+    List<BlockMetaData> blocks = reader.getRowGroups();
+    blocks.forEach(block -> {
+      try {
+        assertEquals(14, block.getRowCount());
+        ColumnChunkMetaData idMeta = block.getColumns().get(0);
+        BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
+        Assert.assertNotNull(bloomFilter);
+        assertEquals(192, idMeta.getBloomFilterOffset());
+        assertEquals(-1, idMeta.getBloomFilterLength());
+        for (int i = 0; i < expectedRowCount; ++i) {
+          
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
+        }
+        for (int i = 0; i < unexpectedValues.length; ++i) {
+          
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
+        }
+        assertEquals(152, idMeta.getTotalSize());
+        assertEquals(163, idMeta.getTotalUncompressedSize());
+        assertEquals(181, idMeta.getOffsetIndexReference().getOffset());
+        assertEquals(11, idMeta.getOffsetIndexReference().getLength());
+        assertEquals(156, idMeta.getColumnIndexReference().getOffset());
+        assertEquals(25, idMeta.getColumnIndexReference().getLength());
+      } catch (IOException e) {
+        fail("Should not throw exception: " + e.getMessage());
+      }
+    });
+  }
+
+  @Test
+  public void testReadDataIndexBloomWithLengthParquetFiles() throws 
IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testReadDataIndexBloomWithLengthParquetFiles {} 
========", rootPath);
+
+    Path filePath = downloadInterOpFiles(rootPath, 
DATA_INDEX_BLOOM_WITH_LENGTH_FILE, httpClient);
+
+    int expectedRowCount = 14;
+    String[] expectedValues = new String[] {
+      "Hello",
+      "This is",
+      "a",
+      "test",
+      "How",
+      "are you",
+      "doing ",
+      "today",
+      "the quick",
+      "brown fox",
+      "jumps",
+      "over",
+      "the lazy",
+      "dog"
+    };
+
+    String[] unexpectedValues = new String[] {"b", "c", "d"};
+
+    try (ParquetReader<Group> reader =
+        ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
+      for (int i = 0; i < expectedRowCount; ++i) {
+        Group group = reader.read();
+        if (group == null) {
+          fail("Should not reach end of file");
+        }
+        assertEquals(expectedValues[i], group.getString(0, 0));
+      }
+    }
+
+    ParquetFileReader reader = new ParquetFileReader(
+        HadoopInputFile.fromPath(filePath, new Configuration()),
+        ParquetReadOptions.builder().build());
+    List<BlockMetaData> blocks = reader.getRowGroups();
+    blocks.forEach(block -> {
+      try {
+        assertEquals(14, block.getRowCount());
+        ColumnChunkMetaData idMeta = block.getColumns().get(0);
+        BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
+        Assert.assertNotNull(bloomFilter);
+        assertEquals(253, idMeta.getBloomFilterOffset());
+        assertEquals(2064, idMeta.getBloomFilterLength());
+        for (int i = 0; i < expectedRowCount; ++i) {
+          
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
+        }
+        for (int i = 0; i < unexpectedValues.length; ++i) {
+          
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
+        }
+        assertEquals(199, idMeta.getTotalSize());
+        assertEquals(199, idMeta.getTotalUncompressedSize());
+        assertEquals(2342, idMeta.getOffsetIndexReference().getOffset());
+        assertEquals(11, idMeta.getOffsetIndexReference().getLength());
+        assertEquals(2317, idMeta.getColumnIndexReference().getOffset());
+        assertEquals(25, idMeta.getColumnIndexReference().getLength());
+      } catch (Exception e) {
+        fail("Should not throw exception: " + e.getMessage());
+      }
+    });
+  }
+
+  private Path downloadInterOpFiles(Path rootPath, String fileName, 
OkHttpClient httpClient) throws IOException {
+    LOG.info("Download interOp files if needed");
+    Configuration conf = new Configuration();
+    FileSystem fs = rootPath.getFileSystem(conf);
+    LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+    if (!fs.exists(rootPath)) {
+      LOG.info("Create folder for interOp files: " + rootPath);
+      if (!fs.mkdirs(rootPath)) {
+        throw new IOException("Cannot create path " + rootPath);
+      }
+    }
+
+    Path file = new Path(rootPath, fileName);
+    if (!fs.exists(file)) {
+      String downloadUrl = PARQUET_TESTING_REPO + fileName;
+      LOG.info("Download interOp file: " + downloadUrl);
+      Request request = new Request.Builder().url(downloadUrl).build();
+      Response response = httpClient.newCall(request).execute();
+      if (!response.isSuccessful()) {
+        throw new IOException("Failed to download file: " + response);
+      }
+      try (FSDataOutputStream fdos = fs.create(file)) {
+        fdos.write(response.body().bytes());
+      }
+    }
+    return file;
+  }
+}

Reply via email to