This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e456506a77 Orc: Support row group bloom filters (#5313)
e456506a77 is described below
commit e456506a777b2a6168dbbdce5cf1e277956ad248
Author: Luning (Lucas) Wang <[email protected]>
AuthorDate: Thu Oct 20 22:47:36 2022 +0800
Orc: Support row group bloom filters (#5313)
---
build.gradle | 1 +
.../java/org/apache/iceberg/TableProperties.java | 7 +
docs/configuration.md | 2 +
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 55 +++++++-
.../org/apache/iceberg/orc/TestBloomFilter.java | 149 +++++++++++++++++++++
5 files changed, 211 insertions(+), 3 deletions(-)
diff --git a/build.gradle b/build.gradle
index 2a413004cb..026bc3ddd0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -545,6 +545,7 @@ project(':iceberg-orc') {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(':iceberg-common')
+ testImplementation 'org.apache.orc:orc-tools'
}
}
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 771f53d527..027ed30b7d 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -177,6 +177,13 @@ public class TableProperties {
public static final String AVRO_COMPRESSION_LEVEL_DEFAULT = null;
public static final String ORC_STRIPE_SIZE_BYTES =
"write.orc.stripe-size-bytes";
+
+ public static final String ORC_BLOOM_FILTER_COLUMNS =
"write.orc.bloom.filter.columns";
+ public static final String ORC_BLOOM_FILTER_COLUMNS_DEFAULT = "";
+
+ public static final String ORC_BLOOM_FILTER_FPP =
"write.orc.bloom.filter.fpp";
+ public static final double ORC_BLOOM_FILTER_FPP_DEFAULT = 0.05;
+
public static final String DELETE_ORC_STRIPE_SIZE_BYTES =
"write.delete.orc.stripe-size-bytes";
public static final long ORC_STRIPE_SIZE_BYTES_DEFAULT = 64L * 1024 * 1024;
// 64 MB
diff --git a/docs/configuration.md b/docs/configuration.md
index f0f0462f94..93f2d0b700 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -64,6 +64,8 @@ Iceberg tables support table properties to configure table
behavior, like the de
| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default
file system block size for ORC files |
| write.orc.compression-codec | zlib | ORC compression
codec: zstd, lz4, lzo, zlib, snappy, none |
| write.orc.compression-strategy | speed | ORC compression
strategy: speed, compression |
+| write.orc.bloom.filter.columns | (not set) | Comma separated
list of column names for which a Bloom filter must be created |
+| write.orc.bloom.filter.fpp | 0.05 | False positive
probability for Bloom filter (must > 0.0 and < 1.0) |
| write.location-provider.impl | null | Optional custom
implementation for LocationProvider |
| write.metadata.compression-codec | none | Metadata
compression codec; none or gzip |
| write.metadata.metrics.default | truncate(16) | Default metrics
mode for all columns in the table; none, counts, truncate(length), or full |
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 79c83aec90..5b2b877a97 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -25,6 +25,10 @@ import static
org.apache.iceberg.TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_ORC_WRITE_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
+import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
+import static
org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS_DEFAULT;
+import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
+import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
@@ -202,6 +206,8 @@ public class ORC {
OrcConf.COMPRESS.setString(conf, context.compressionKind().name());
OrcConf.COMPRESSION_STRATEGY.setString(conf,
context.compressionStrategy().name());
OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, overwrite);
+ OrcConf.BLOOM_FILTER_COLUMNS.setString(conf,
context.bloomFilterColumns());
+ OrcConf.BLOOM_FILTER_FPP.setDouble(conf, context.bloomFilterFpp());
return new OrcFileAppender<>(
schema,
@@ -219,6 +225,8 @@ public class ORC {
private final int vectorizedRowBatchSize;
private final CompressionKind compressionKind;
private final CompressionStrategy compressionStrategy;
+ private final String bloomFilterColumns;
+ private final double bloomFilterFpp;
public long stripeSize() {
return stripeSize;
@@ -240,17 +248,29 @@ public class ORC {
return compressionStrategy;
}
+ public String bloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ public double bloomFilterFpp() {
+ return bloomFilterFpp;
+ }
+
private Context(
long stripeSize,
long blockSize,
int vectorizedRowBatchSize,
CompressionKind compressionKind,
- CompressionStrategy compressionStrategy) {
+ CompressionStrategy compressionStrategy,
+ String bloomFilterColumns,
+ double bloomFilterFpp) {
this.stripeSize = stripeSize;
this.blockSize = blockSize;
this.vectorizedRowBatchSize = vectorizedRowBatchSize;
this.compressionKind = compressionKind;
this.compressionStrategy = compressionStrategy;
+ this.bloomFilterColumns = bloomFilterColumns;
+ this.bloomFilterFpp = bloomFilterFpp;
}
static Context dataContext(Map<String, String> config) {
@@ -286,8 +306,31 @@ public class ORC {
PropertyUtil.propertyAsString(config, ORC_COMPRESSION_STRATEGY,
strategyAsString);
CompressionStrategy compressionStrategy =
toCompressionStrategy(strategyAsString);
+ String bloomFilterColumns =
+ PropertyUtil.propertyAsString(
+ config,
+ OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(),
+ ORC_BLOOM_FILTER_COLUMNS_DEFAULT);
+ bloomFilterColumns =
+ PropertyUtil.propertyAsString(config, ORC_BLOOM_FILTER_COLUMNS,
bloomFilterColumns);
+
+ double bloomFilterFpp =
+ PropertyUtil.propertyAsDouble(
+ config, OrcConf.BLOOM_FILTER_FPP.getAttribute(),
ORC_BLOOM_FILTER_FPP_DEFAULT);
+ bloomFilterFpp =
+ PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP,
bloomFilterFpp);
+ Preconditions.checkArgument(
+ bloomFilterFpp > 0.0 && bloomFilterFpp < 1.0,
+ "Bloom filter fpp must be > 0.0 and < 1.0");
+
return new Context(
- stripeSize, blockSize, vectorizedRowBatchSize, compressionKind,
compressionStrategy);
+ stripeSize,
+ blockSize,
+ vectorizedRowBatchSize,
+ compressionKind,
+ compressionStrategy,
+ bloomFilterColumns,
+ bloomFilterFpp);
}
static Context deleteContext(Map<String, String> config) {
@@ -318,7 +361,13 @@ public class ORC {
: dataContext.compressionStrategy();
return new Context(
- stripeSize, blockSize, vectorizedRowBatchSize, compressionKind,
compressionStrategy);
+ stripeSize,
+ blockSize,
+ vectorizedRowBatchSize,
+ compressionKind,
+ compressionStrategy,
+ dataContext.bloomFilterColumns(),
+ dataContext.bloomFilterFpp());
}
private static CompressionKind toCompressionKind(String codecAsString) {
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java
b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java
new file mode 100644
index 0000000000..c27ce9b18a
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.Reader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OrcIndex;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.apache.orc.impl.WriterImpl;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestBloomFilter {
+ private static final Schema DATA_SCHEMA =
+ new Schema(
+ required(100, "id", Types.LongType.get()),
+ required(101, "name", Types.StringType.get()),
+ required(102, "price", Types.DoubleType.get()));
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testWriteOption() throws Exception {
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ OutputFile outFile = Files.localOutput(testFile);
+ try (FileAppender<Record> writer =
+ ORC.write(outFile)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .schema(DATA_SCHEMA)
+ .set("write.orc.bloom.filter.columns", "id,name")
+ .set("write.orc.bloom.filter.fpp", "0.04")
+ .build()) {
+
+ Class clazzOrcFileAppender =
Class.forName("org.apache.iceberg.orc.OrcFileAppender");
+ Field writerField = clazzOrcFileAppender.getDeclaredField("writer");
+ writerField.setAccessible(true);
+ WriterImpl orcWriter = (WriterImpl) writerField.get(writer);
+
+ Class clazzWriterImpl = Class.forName("org.apache.orc.impl.WriterImpl");
+ Field bloomFilterColumnsField =
clazzWriterImpl.getDeclaredField("bloomFilterColumns");
+ Field bloomFilterFppField =
clazzWriterImpl.getDeclaredField("bloomFilterFpp");
+ bloomFilterColumnsField.setAccessible(true);
+ bloomFilterFppField.setAccessible(true);
+ boolean[] bloomFilterColumns = (boolean[])
bloomFilterColumnsField.get(orcWriter);
+ double bloomFilterFpp = (double) bloomFilterFppField.get(orcWriter);
+
+ // Validate whether the bloom filters are set in ORC SDK or not
+ Assert.assertTrue(bloomFilterColumns[1]);
+ Assert.assertTrue(bloomFilterColumns[2]);
+ Assert.assertEquals(0.04, bloomFilterFpp, 1e-15);
+
+ Record recordTemplate = GenericRecord.create(DATA_SCHEMA);
+ Record record1 = recordTemplate.copy("id", 1L, "name", "foo", "price",
1.0);
+ Record record2 = recordTemplate.copy("id", 2L, "name", "bar", "price",
2.0);
+ writer.add(record1);
+ writer.add(record2);
+ }
+
+ Class clazzFileDump = Class.forName("org.apache.orc.tools.FileDump");
+ Method getFormattedBloomFilters =
+ clazzFileDump.getDeclaredMethod(
+ "getFormattedBloomFilters",
+ int.class,
+ OrcIndex.class,
+ OrcFile.WriterVersion.class,
+ TypeDescription.Category.class,
+ OrcProto.ColumnEncoding.class);
+ getFormattedBloomFilters.setAccessible(true);
+
+ try (Reader reader =
+ OrcFile.createReader(
+ new Path(outFile.location()), new OrcFile.ReaderOptions(new
Configuration())); ) {
+ boolean[] readCols = new boolean[] {false, true, true, false};
+ RecordReaderImpl rows = (RecordReaderImpl) reader.rows();
+ OrcIndex indices = rows.readRowIndex(0, null, readCols);
+ StripeInformation stripe = reader.getStripes().get(0);
+ OrcProto.StripeFooter footer = rows.readStripeFooter(stripe);
+
+ String bloomFilterString =
+ (String)
+ getFormattedBloomFilters.invoke(
+ null,
+ 1,
+ indices,
+ reader.getWriterVersion(),
+ reader.getSchema().findSubtype(1).getCategory(),
+ footer.getColumns(1));
+
+ // Validate whether the bloom filters are written ORC files or not
+ Assert.assertTrue(bloomFilterString.contains("Bloom filters for
column"));
+ }
+ }
+
+ @Test
+ public void testInvalidFppOption() throws Exception {
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<Record> writer =
+ ORC.write(Files.localOutput(testFile))
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .schema(DATA_SCHEMA)
+ .set("write.orc.bloom.filter.columns", "id,name")
+ .set("write.orc.bloom.filter.fpp", "-1")
+ .build()) {
+ Assert.fail("Expected exception");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("Bloom filter fpp must be >
0.0 and < 1.0"));
+ }
+ }
+}