This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e456506a77 Orc: Support row group bloom filters (#5313)
e456506a77 is described below

commit e456506a777b2a6168dbbdce5cf1e277956ad248
Author: Luning (Lucas) Wang <[email protected]>
AuthorDate: Thu Oct 20 22:47:36 2022 +0800

    Orc: Support row group bloom filters (#5313)
---
 build.gradle                                       |   1 +
 .../java/org/apache/iceberg/TableProperties.java   |   7 +
 docs/configuration.md                              |   2 +
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  |  55 +++++++-
 .../org/apache/iceberg/orc/TestBloomFilter.java    | 149 +++++++++++++++++++++
 5 files changed, 211 insertions(+), 3 deletions(-)

diff --git a/build.gradle b/build.gradle
index 2a413004cb..026bc3ddd0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -545,6 +545,7 @@ project(':iceberg-orc') {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(':iceberg-common')
+    testImplementation 'org.apache.orc:orc-tools'
   }
 }
 
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java 
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 771f53d527..027ed30b7d 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -177,6 +177,13 @@ public class TableProperties {
   public static final String AVRO_COMPRESSION_LEVEL_DEFAULT = null;
 
   public static final String ORC_STRIPE_SIZE_BYTES = 
"write.orc.stripe-size-bytes";
+
+  public static final String ORC_BLOOM_FILTER_COLUMNS = 
"write.orc.bloom.filter.columns";
+  public static final String ORC_BLOOM_FILTER_COLUMNS_DEFAULT = "";
+
+  public static final String ORC_BLOOM_FILTER_FPP = 
"write.orc.bloom.filter.fpp";
+  public static final double ORC_BLOOM_FILTER_FPP_DEFAULT = 0.05;
+
   public static final String DELETE_ORC_STRIPE_SIZE_BYTES = 
"write.delete.orc.stripe-size-bytes";
   public static final long ORC_STRIPE_SIZE_BYTES_DEFAULT = 64L * 1024 * 1024; 
// 64 MB
 
diff --git a/docs/configuration.md b/docs/configuration.md
index f0f0462f94..93f2d0b700 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -64,6 +64,8 @@ Iceberg tables support table properties to configure table 
behavior, like the de
 | write.orc.block-size-bytes         | 268435456 (256 MB) | Define the default 
file system block size for ORC files |
 | write.orc.compression-codec        | zlib               | ORC compression 
codec: zstd, lz4, lzo, zlib, snappy, none |
 | write.orc.compression-strategy     | speed              | ORC compression 
strategy: speed, compression |
+| write.orc.bloom.filter.columns     | (not set)          | Comma separated 
list of column names for which a Bloom filter must be created |
+| write.orc.bloom.filter.fpp         | 0.05               | False positive 
probability for Bloom filter (must > 0.0 and < 1.0) |
 | write.location-provider.impl       | null               | Optional custom 
implementation for LocationProvider  |
 | write.metadata.compression-codec   | none               | Metadata 
compression codec; none or gzip           |
 | write.metadata.metrics.default     | truncate(16)       | Default metrics 
mode for all columns in the table; none, counts, truncate(length), or full |
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java 
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 79c83aec90..5b2b877a97 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -25,6 +25,10 @@ import static 
org.apache.iceberg.TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.DELETE_ORC_WRITE_BATCH_SIZE;
 import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
+import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
+import static 
org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS_DEFAULT;
+import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
+import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP_DEFAULT;
 import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
 import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
 import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
@@ -202,6 +206,8 @@ public class ORC {
       OrcConf.COMPRESS.setString(conf, context.compressionKind().name());
       OrcConf.COMPRESSION_STRATEGY.setString(conf, 
context.compressionStrategy().name());
       OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, overwrite);
+      OrcConf.BLOOM_FILTER_COLUMNS.setString(conf, 
context.bloomFilterColumns());
+      OrcConf.BLOOM_FILTER_FPP.setDouble(conf, context.bloomFilterFpp());
 
       return new OrcFileAppender<>(
           schema,
@@ -219,6 +225,8 @@ public class ORC {
       private final int vectorizedRowBatchSize;
       private final CompressionKind compressionKind;
       private final CompressionStrategy compressionStrategy;
+      private final String bloomFilterColumns;
+      private final double bloomFilterFpp;
 
       public long stripeSize() {
         return stripeSize;
@@ -240,17 +248,29 @@ public class ORC {
         return compressionStrategy;
       }
 
+      public String bloomFilterColumns() {
+        return bloomFilterColumns;
+      }
+
+      public double bloomFilterFpp() {
+        return bloomFilterFpp;
+      }
+
       private Context(
           long stripeSize,
           long blockSize,
           int vectorizedRowBatchSize,
           CompressionKind compressionKind,
-          CompressionStrategy compressionStrategy) {
+          CompressionStrategy compressionStrategy,
+          String bloomFilterColumns,
+          double bloomFilterFpp) {
         this.stripeSize = stripeSize;
         this.blockSize = blockSize;
         this.vectorizedRowBatchSize = vectorizedRowBatchSize;
         this.compressionKind = compressionKind;
         this.compressionStrategy = compressionStrategy;
+        this.bloomFilterColumns = bloomFilterColumns;
+        this.bloomFilterFpp = bloomFilterFpp;
       }
 
       static Context dataContext(Map<String, String> config) {
@@ -286,8 +306,31 @@ public class ORC {
             PropertyUtil.propertyAsString(config, ORC_COMPRESSION_STRATEGY, 
strategyAsString);
         CompressionStrategy compressionStrategy = 
toCompressionStrategy(strategyAsString);
 
+        String bloomFilterColumns =
+            PropertyUtil.propertyAsString(
+                config,
+                OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(),
+                ORC_BLOOM_FILTER_COLUMNS_DEFAULT);
+        bloomFilterColumns =
+            PropertyUtil.propertyAsString(config, ORC_BLOOM_FILTER_COLUMNS, 
bloomFilterColumns);
+
+        double bloomFilterFpp =
+            PropertyUtil.propertyAsDouble(
+                config, OrcConf.BLOOM_FILTER_FPP.getAttribute(), 
ORC_BLOOM_FILTER_FPP_DEFAULT);
+        bloomFilterFpp =
+            PropertyUtil.propertyAsDouble(config, ORC_BLOOM_FILTER_FPP, 
bloomFilterFpp);
+        Preconditions.checkArgument(
+            bloomFilterFpp > 0.0 && bloomFilterFpp < 1.0,
+            "Bloom filter fpp must be > 0.0 and < 1.0");
+
         return new Context(
-            stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, 
compressionStrategy);
+            stripeSize,
+            blockSize,
+            vectorizedRowBatchSize,
+            compressionKind,
+            compressionStrategy,
+            bloomFilterColumns,
+            bloomFilterFpp);
       }
 
       static Context deleteContext(Map<String, String> config) {
@@ -318,7 +361,13 @@ public class ORC {
                 : dataContext.compressionStrategy();
 
         return new Context(
-            stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, 
compressionStrategy);
+            stripeSize,
+            blockSize,
+            vectorizedRowBatchSize,
+            compressionKind,
+            compressionStrategy,
+            dataContext.bloomFilterColumns(),
+            dataContext.bloomFilterFpp());
       }
 
       private static CompressionKind toCompressionKind(String codecAsString) {
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java 
b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java
new file mode 100644
index 0000000000..c27ce9b18a
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestBloomFilter.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.Reader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OrcIndex;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.apache.orc.impl.WriterImpl;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestBloomFilter {
+  private static final Schema DATA_SCHEMA =
+      new Schema(
+          required(100, "id", Types.LongType.get()),
+          required(101, "name", Types.StringType.get()),
+          required(102, "price", Types.DoubleType.get()));
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testWriteOption() throws Exception {
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    OutputFile outFile = Files.localOutput(testFile);
+    try (FileAppender<Record> writer =
+        ORC.write(outFile)
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .schema(DATA_SCHEMA)
+            .set("write.orc.bloom.filter.columns", "id,name")
+            .set("write.orc.bloom.filter.fpp", "0.04")
+            .build()) {
+
+      Class clazzOrcFileAppender = 
Class.forName("org.apache.iceberg.orc.OrcFileAppender");
+      Field writerField = clazzOrcFileAppender.getDeclaredField("writer");
+      writerField.setAccessible(true);
+      WriterImpl orcWriter = (WriterImpl) writerField.get(writer);
+
+      Class clazzWriterImpl = Class.forName("org.apache.orc.impl.WriterImpl");
+      Field bloomFilterColumnsField = 
clazzWriterImpl.getDeclaredField("bloomFilterColumns");
+      Field bloomFilterFppField = 
clazzWriterImpl.getDeclaredField("bloomFilterFpp");
+      bloomFilterColumnsField.setAccessible(true);
+      bloomFilterFppField.setAccessible(true);
+      boolean[] bloomFilterColumns = (boolean[]) 
bloomFilterColumnsField.get(orcWriter);
+      double bloomFilterFpp = (double) bloomFilterFppField.get(orcWriter);
+
+      // Validate whether the bloom filters are set in ORC SDK or not
+      Assert.assertTrue(bloomFilterColumns[1]);
+      Assert.assertTrue(bloomFilterColumns[2]);
+      Assert.assertEquals(0.04, bloomFilterFpp, 1e-15);
+
+      Record recordTemplate = GenericRecord.create(DATA_SCHEMA);
+      Record record1 = recordTemplate.copy("id", 1L, "name", "foo", "price", 
1.0);
+      Record record2 = recordTemplate.copy("id", 2L, "name", "bar", "price", 
2.0);
+      writer.add(record1);
+      writer.add(record2);
+    }
+
+    Class clazzFileDump = Class.forName("org.apache.orc.tools.FileDump");
+    Method getFormattedBloomFilters =
+        clazzFileDump.getDeclaredMethod(
+            "getFormattedBloomFilters",
+            int.class,
+            OrcIndex.class,
+            OrcFile.WriterVersion.class,
+            TypeDescription.Category.class,
+            OrcProto.ColumnEncoding.class);
+    getFormattedBloomFilters.setAccessible(true);
+
+    try (Reader reader =
+        OrcFile.createReader(
+            new Path(outFile.location()), new OrcFile.ReaderOptions(new 
Configuration())); ) {
+      boolean[] readCols = new boolean[] {false, true, true, false};
+      RecordReaderImpl rows = (RecordReaderImpl) reader.rows();
+      OrcIndex indices = rows.readRowIndex(0, null, readCols);
+      StripeInformation stripe = reader.getStripes().get(0);
+      OrcProto.StripeFooter footer = rows.readStripeFooter(stripe);
+
+      String bloomFilterString =
+          (String)
+              getFormattedBloomFilters.invoke(
+                  null,
+                  1,
+                  indices,
+                  reader.getWriterVersion(),
+                  reader.getSchema().findSubtype(1).getCategory(),
+                  footer.getColumns(1));
+
+      // Validate whether the bloom filters are written ORC files or not
+      Assert.assertTrue(bloomFilterString.contains("Bloom filters for 
column"));
+    }
+  }
+
+  @Test
+  public void testInvalidFppOption() throws Exception {
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+
+    try (FileAppender<Record> writer =
+        ORC.write(Files.localOutput(testFile))
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .schema(DATA_SCHEMA)
+            .set("write.orc.bloom.filter.columns", "id,name")
+            .set("write.orc.bloom.filter.fpp", "-1")
+            .build()) {
+      Assert.fail("Expected exception");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("Bloom filter fpp must be > 
0.0 and < 1.0"));
+    }
+  }
+}

Reply via email to