This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new eface4db2 PARQUET-2373: Improve performance with bloom_filter_length
(#1184)
eface4db2 is described below
commit eface4db2e961a0174d4d1cd809134262d8c4c76
Author: Jiashen zhang <[email protected]>
AuthorDate: Sat Jan 27 08:04:33 2024 -0800
PARQUET-2373: Improve performance with bloom_filter_length (#1184)
---
.../format/converter/ParquetMetadataConverter.java | 7 +
.../apache/parquet/hadoop/ParquetFileReader.java | 22 +-
.../apache/parquet/hadoop/ParquetFileWriter.java | 3 +
.../hadoop/metadata/ColumnChunkMetaData.java | 22 +-
.../converter/TestParquetMetadataConverter.java | 25 +++
.../parquet/hadoop/TestInteropBloomFilter.java | 225 +++++++++++++++++++++
6 files changed, 298 insertions(+), 6 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 47ab0649d..350bc3794 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -563,6 +563,10 @@ public class ParquetMetadataConverter {
if (bloomFilterOffset >= 0) {
metaData.setBloom_filter_offset(bloomFilterOffset);
}
+ int bloomFilterLength = columnMetaData.getBloomFilterLength();
+ if (bloomFilterLength >= 0) {
+ metaData.setBloom_filter_length(bloomFilterLength);
+ }
if (columnMetaData.getStatistics() != null
&& !columnMetaData.getStatistics().isEmpty()) {
metaData.setStatistics(
@@ -1689,6 +1693,9 @@ public class ParquetMetadataConverter {
if (metaData.isSetBloom_filter_offset()) {
column.setBloomFilterOffset(metaData.getBloom_filter_offset());
}
+ if (metaData.isSetBloom_filter_length()) {
+ column.setBloomFilterLength(metaData.getBloom_filter_length());
+ }
} else { // column encrypted with column key
// Metadata will be decrypted later, if this column is accessed
EncryptionWithColumnKey columnKeyStruct =
cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 91be68ed6..19403c329 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -32,6 +32,7 @@ import static
org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
import static
org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static
org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -1442,11 +1443,24 @@ public class ParquetFileReader implements Closeable {
}
}
- // Read Bloom filter data header.
+ // Seek to Bloom filter offset.
f.seek(bloomFilterOffset);
+
+ // Read Bloom filter length.
+ int bloomFilterLength = meta.getBloomFilterLength();
+
+ // If it is set, read Bloom filter header and bitset together.
+ // Otherwise, read Bloom filter header first and then bitset.
+ InputStream in = f;
+ if (bloomFilterLength > 0) {
+ byte[] headerAndBitSet = new byte[bloomFilterLength];
+ f.readFully(headerAndBitSet);
+ in = new ByteArrayInputStream(headerAndBitSet);
+ }
+
BloomFilterHeader bloomFilterHeader;
try {
- bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor,
bloomFilterHeaderAAD);
+ bloomFilterHeader = Util.readBloomFilterHeader(in, bloomFilterDecryptor,
bloomFilterHeaderAAD);
} catch (IOException e) {
LOG.warn("read no bloom filter");
return null;
@@ -1472,9 +1486,9 @@ public class ParquetFileReader implements Closeable {
byte[] bitset;
if (null == bloomFilterDecryptor) {
bitset = new byte[numBytes];
- f.readFully(bitset);
+ in.read(bitset);
} else {
- bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
+ bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
if (bitset.length != numBytes) {
throw new ParquetCryptoRuntimeException("Wrong length of decrypted
bloom filter bitset");
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 0ade8956a..57700d494 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1620,6 +1620,9 @@ public class ParquetFileWriter {
serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset,
bloomFilterBitsetAAD);
}
out.write(serializedBitset);
+
+ int length = (int) (out.getPos() - offset);
+ column.setBloomFilterLength(length);
}
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 966367dae..62564c797 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -260,6 +260,7 @@ public abstract class ColumnChunkMetaData {
private IndexReference offsetIndexReference;
private long bloomFilterOffset = -1;
+ private int bloomFilterLength = -1;
protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
this(null, columnChunkProperties);
@@ -383,8 +384,14 @@ public abstract class ColumnChunkMetaData {
}
/**
- * Method should be considered private
- *
+ * @param bloomFilterLength
+ * the reference to the Bloom filter
+ */
+ public void setBloomFilterLength(int bloomFilterLength) {
+ this.bloomFilterLength = bloomFilterLength;
+ }
+
+ /**
* @return the offset to the Bloom filter or {@code -1} if there is no bloom
filter for this column chunk
*/
public long getBloomFilterOffset() {
@@ -392,6 +399,14 @@ public abstract class ColumnChunkMetaData {
return bloomFilterOffset;
}
+ /**
+ * @return the length to the Bloom filter or {@code -1} if there is no bloom
filter length for this column chunk
+ */
+ public int getBloomFilterLength() {
+ decryptIfNeeded();
+ return bloomFilterLength;
+ }
+
/**
* @return all the encodings used in this column
*/
@@ -693,6 +708,9 @@ class EncryptedColumnChunkMetaData extends
ColumnChunkMetaData {
if (metaData.isSetBloom_filter_offset()) {
setBloomFilterOffset(metaData.getBloom_filter_offset());
}
+ if (metaData.isSetBloom_filter_length()) {
+ setBloomFilterLength(metaData.getBloom_filter_length());
+ }
}
@Override
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 3be9f6fde..d03b55172 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -270,6 +270,31 @@ public class TestParquetMetadataConverter {
1234,
convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
}
+ @Test
+ public void testBloomFilterLength() throws IOException {
+ ParquetMetadata origMetaData = createParquetMetaData(null, Encoding.PLAIN);
+ ParquetMetadataConverter converter = new ParquetMetadataConverter();
+
+ // Without bloom filter length
+ FileMetaData footer = converter.toParquetMetadata(1, origMetaData);
+ assertFalse(
+
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
+ ParquetMetadata convertedMetaData = converter.fromParquetMetadata(footer);
+
assertTrue(convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength()
< 0);
+
+ // With bloom filter length
+
origMetaData.getBlocks().get(0).getColumns().get(0).setBloomFilterLength(1024);
+ footer = converter.toParquetMetadata(1, origMetaData);
+ assertTrue(
+
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().isSetBloom_filter_length());
+ assertEquals(
+ 1024,
+
footer.getRow_groups().get(0).getColumns().get(0).getMeta_data().getBloom_filter_length());
+ convertedMetaData = converter.fromParquetMetadata(footer);
+ assertEquals(
+ 1024,
convertedMetaData.getBlocks().get(0).getColumns().get(0).getBloomFilterLength());
+ }
+
@Test
public void testLogicalTypesBackwardCompatibleWithConvertedTypes() {
ParquetMetadataConverter parquetMetadataConverter = new
ParquetMetadataConverter();
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java
new file mode 100644
index 000000000..5b7ae6a85
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInteropBloomFilter.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestInteropBloomFilter {
+
+ // The link includes a reference to a specific commit. To take a newer
version - update this link.
+ private static final String PARQUET_TESTING_REPO =
"https://github.com/apache/parquet-testing/raw/d69d979/data/";
+ private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
+ // parquet-testing: https://github.com/apache/parquet-testing/pull/22
+ private static String DATA_INDEX_BLOOM_FILE =
"data_index_bloom_encoding_stats.parquet";
+ // parquet-testing: https://github.com/apache/parquet-testing/pull/43
+ private static String DATA_INDEX_BLOOM_WITH_LENGTH_FILE =
"data_index_bloom_encoding_with_length.parquet";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestInteropBloomFilter.class);
+ private OkHttpClient httpClient = new OkHttpClient();
+
+ @Test
+ public void testReadDataIndexBloomParquetFiles() throws IOException {
+ Path rootPath = new Path(PARQUET_TESTING_PATH);
+ LOG.info("======== testReadDataIndexBloomParquetFiles {} ========",
rootPath);
+
+ Path filePath = downloadInterOpFiles(rootPath, DATA_INDEX_BLOOM_FILE,
httpClient);
+
+ int expectedRowCount = 14;
+ String[] expectedValues = new String[] {
+ "Hello",
+ "This is",
+ "a",
+ "test",
+ "How",
+ "are you",
+ "doing ",
+ "today",
+ "the quick",
+ "brown fox",
+ "jumps",
+ "over",
+ "the lazy",
+ "dog"
+ };
+
+ String[] unexpectedValues = new String[] {"b", "c", "d"};
+
+ try (ParquetReader<Group> reader =
+ ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
+ for (int i = 0; i < expectedRowCount; ++i) {
+ Group group = reader.read();
+ if (group == null) {
+ fail("Should not reach end of file");
+ }
+ assertEquals(expectedValues[i], group.getString(0, 0));
+ }
+ }
+
+ ParquetFileReader reader = new ParquetFileReader(
+ HadoopInputFile.fromPath(filePath, new Configuration()),
+ ParquetReadOptions.builder().build());
+ List<BlockMetaData> blocks = reader.getRowGroups();
+ blocks.forEach(block -> {
+ try {
+ assertEquals(14, block.getRowCount());
+ ColumnChunkMetaData idMeta = block.getColumns().get(0);
+ BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
+ Assert.assertNotNull(bloomFilter);
+ assertEquals(192, idMeta.getBloomFilterOffset());
+ assertEquals(-1, idMeta.getBloomFilterLength());
+ for (int i = 0; i < expectedRowCount; ++i) {
+
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
+ }
+ for (int i = 0; i < unexpectedValues.length; ++i) {
+
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
+ }
+ assertEquals(152, idMeta.getTotalSize());
+ assertEquals(163, idMeta.getTotalUncompressedSize());
+ assertEquals(181, idMeta.getOffsetIndexReference().getOffset());
+ assertEquals(11, idMeta.getOffsetIndexReference().getLength());
+ assertEquals(156, idMeta.getColumnIndexReference().getOffset());
+ assertEquals(25, idMeta.getColumnIndexReference().getLength());
+ } catch (IOException e) {
+ fail("Should not throw exception: " + e.getMessage());
+ }
+ });
+ }
+
+ @Test
+ public void testReadDataIndexBloomWithLengthParquetFiles() throws
IOException {
+ Path rootPath = new Path(PARQUET_TESTING_PATH);
+ LOG.info("======== testReadDataIndexBloomWithLengthParquetFiles {}
========", rootPath);
+
+ Path filePath = downloadInterOpFiles(rootPath,
DATA_INDEX_BLOOM_WITH_LENGTH_FILE, httpClient);
+
+ int expectedRowCount = 14;
+ String[] expectedValues = new String[] {
+ "Hello",
+ "This is",
+ "a",
+ "test",
+ "How",
+ "are you",
+ "doing ",
+ "today",
+ "the quick",
+ "brown fox",
+ "jumps",
+ "over",
+ "the lazy",
+ "dog"
+ };
+
+ String[] unexpectedValues = new String[] {"b", "c", "d"};
+
+ try (ParquetReader<Group> reader =
+ ParquetReader.builder(new GroupReadSupport(), filePath).build()) {
+ for (int i = 0; i < expectedRowCount; ++i) {
+ Group group = reader.read();
+ if (group == null) {
+ fail("Should not reach end of file");
+ }
+ assertEquals(expectedValues[i], group.getString(0, 0));
+ }
+ }
+
+ ParquetFileReader reader = new ParquetFileReader(
+ HadoopInputFile.fromPath(filePath, new Configuration()),
+ ParquetReadOptions.builder().build());
+ List<BlockMetaData> blocks = reader.getRowGroups();
+ blocks.forEach(block -> {
+ try {
+ assertEquals(14, block.getRowCount());
+ ColumnChunkMetaData idMeta = block.getColumns().get(0);
+ BloomFilter bloomFilter = reader.readBloomFilter(idMeta);
+ Assert.assertNotNull(bloomFilter);
+ assertEquals(253, idMeta.getBloomFilterOffset());
+ assertEquals(2064, idMeta.getBloomFilterLength());
+ for (int i = 0; i < expectedRowCount; ++i) {
+
assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(expectedValues[i]))));
+ }
+ for (int i = 0; i < unexpectedValues.length; ++i) {
+
assertFalse(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(unexpectedValues[i]))));
+ }
+ assertEquals(199, idMeta.getTotalSize());
+ assertEquals(199, idMeta.getTotalUncompressedSize());
+ assertEquals(2342, idMeta.getOffsetIndexReference().getOffset());
+ assertEquals(11, idMeta.getOffsetIndexReference().getLength());
+ assertEquals(2317, idMeta.getColumnIndexReference().getOffset());
+ assertEquals(25, idMeta.getColumnIndexReference().getLength());
+ } catch (Exception e) {
+ fail("Should not throw exception: " + e.getMessage());
+ }
+ });
+ }
+
+ private Path downloadInterOpFiles(Path rootPath, String fileName,
OkHttpClient httpClient) throws IOException {
+ LOG.info("Download interOp files if needed");
+ Configuration conf = new Configuration();
+ FileSystem fs = rootPath.getFileSystem(conf);
+ LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
+ if (!fs.exists(rootPath)) {
+ LOG.info("Create folder for interOp files: " + rootPath);
+ if (!fs.mkdirs(rootPath)) {
+ throw new IOException("Cannot create path " + rootPath);
+ }
+ }
+
+ Path file = new Path(rootPath, fileName);
+ if (!fs.exists(file)) {
+ String downloadUrl = PARQUET_TESTING_REPO + fileName;
+ LOG.info("Download interOp file: " + downloadUrl);
+ Request request = new Request.Builder().url(downloadUrl).build();
+ Response response = httpClient.newCall(request).execute();
+ if (!response.isSuccessful()) {
+ throw new IOException("Failed to download file: " + response);
+ }
+ try (FSDataOutputStream fdos = fs.create(file)) {
+ fdos.write(response.body().bytes());
+ }
+ }
+ return file;
+ }
+}