This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dbb5cc429 Parquet: Use compatible column name to set Parquet bloom 
filter (#11799)
3dbb5cc429 is described below

commit 3dbb5cc429e1a52555dddbac62a6087bd651cc5c
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Jan 10 07:22:02 2025 -0800

    Parquet: Use compatible column name to set Parquet bloom filter (#11799)
---
 .../java/org/apache/iceberg/parquet/Parquet.java   | 74 +++++++++++++++-------
 .../iceberg/parquet/TestBloomRowGroupFilter.java   | 32 ++++++++--
 2 files changed, 77 insertions(+), 29 deletions(-)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index a3a5931832..310435209b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -55,6 +55,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -95,6 +96,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.PropertyUtil;
@@ -115,8 +117,12 @@ import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Parquet {
+  private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);
+
   private Parquet() {}
 
   private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
@@ -266,6 +272,43 @@ public class Parquet {
       return this;
     }
 
+    private <T> void setBloomFilterConfig(
+        Context context,
+        MessageType parquetSchema,
+        BiConsumer<String, Boolean> withBloomFilterEnabled,
+        BiConsumer<String, Double> withBloomFilterFPP) {
+
+      Map<Integer, String> fieldIdToParquetPath =
+          parquetSchema.getColumns().stream()
+              .collect(
+                  Collectors.toMap(
+                      col -> col.getPrimitiveType().getId().intValue(),
+                      col -> String.join(".", col.getPath())));
+
+      context
+          .columnBloomFilterEnabled()
+          .forEach(
+              (colPath, isEnabled) -> {
+                Types.NestedField fieldId = schema.findField(colPath);
+                if (fieldId == null) {
+                  LOG.warn("Skipping bloom filter config for missing field: 
{}", colPath);
+                  return;
+                }
+
+                String parquetColumnPath = 
fieldIdToParquetPath.get(fieldId.fieldId());
+                if (parquetColumnPath == null) {
+                  LOG.warn("Skipping bloom filter config for missing field: 
{}", fieldId);
+                  return;
+                }
+
+                withBloomFilterEnabled.accept(parquetColumnPath, 
Boolean.valueOf(isEnabled));
+                String fpp = context.columnBloomFilterFpp().get(colPath);
+                if (fpp != null) {
+                  withBloomFilterFPP.accept(parquetColumnPath, 
Double.parseDouble(fpp));
+                }
+              });
+    }
+
     public <D> FileAppender<D> build() throws IOException {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be 
null");
@@ -285,8 +328,6 @@ public class Parquet {
       int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
       int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
       int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
-      Map<String, String> columnBloomFilterFpp = 
context.columnBloomFilterFpp();
-      Map<String, String> columnBloomFilterEnabled = 
context.columnBloomFilterEnabled();
       boolean dictionaryEnabled = context.dictionaryEnabled();
 
       if (compressionLevel != null) {
@@ -343,17 +384,8 @@ public class Parquet {
                 .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
                 .withMaxBloomFilterBytes(bloomFilterMaxBytes);
 
-        for (Map.Entry<String, String> entry : 
columnBloomFilterEnabled.entrySet()) {
-          String colPath = entry.getKey();
-          String bloomEnabled = entry.getValue();
-          propsBuilder.withBloomFilterEnabled(colPath, 
Boolean.parseBoolean(bloomEnabled));
-        }
-
-        for (Map.Entry<String, String> entry : 
columnBloomFilterFpp.entrySet()) {
-          String colPath = entry.getKey();
-          String fpp = entry.getValue();
-          propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp));
-        }
+        setBloomFilterConfig(
+            context, type, propsBuilder::withBloomFilterEnabled, 
propsBuilder::withBloomFilterFPP);
 
         ParquetProperties parquetProperties = propsBuilder.build();
 
@@ -386,17 +418,11 @@ public class Parquet {
                 .withDictionaryPageSize(dictionaryPageSize)
                 .withEncryption(fileEncryptionProperties);
 
-        for (Map.Entry<String, String> entry : 
columnBloomFilterEnabled.entrySet()) {
-          String colPath = entry.getKey();
-          String bloomEnabled = entry.getValue();
-          parquetWriteBuilder.withBloomFilterEnabled(colPath, 
Boolean.parseBoolean(bloomEnabled));
-        }
-
-        for (Map.Entry<String, String> entry : 
columnBloomFilterFpp.entrySet()) {
-          String colPath = entry.getKey();
-          String fpp = entry.getValue();
-          parquetWriteBuilder.withBloomFilterFPP(colPath, 
Double.parseDouble(fpp));
-        }
+        setBloomFilterConfig(
+            context,
+            type,
+            parquetWriteBuilder::withBloomFilterEnabled,
+            parquetWriteBuilder::withBloomFilterFPP);
 
         return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), 
metricsConfig);
       }
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
index 62f330f9f5..bfa511c912 100644
--- 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
@@ -111,7 +111,8 @@ public class TestBloomRowGroupFilter {
           optional(24, "binary", Types.BinaryType.get()),
           optional(25, "int_decimal", Types.DecimalType.of(8, 2)),
           optional(26, "long_decimal", Types.DecimalType.of(14, 2)),
-          optional(27, "fixed_decimal", Types.DecimalType.of(31, 2)));
+          optional(27, "fixed_decimal", Types.DecimalType.of(31, 2)),
+          optional(28, "incompatible-name", Types.DecimalType.of(8, 2)));
 
   private static final Types.StructType UNDERSCORE_STRUCT_FIELD_TYPE =
       Types.StructType.of(Types.NestedField.required(16, "_int_field", 
IntegerType.get()));
@@ -142,7 +143,8 @@ public class TestBloomRowGroupFilter {
           optional(24, "_binary", Types.BinaryType.get()),
           optional(25, "_int_decimal", Types.DecimalType.of(8, 2)),
           optional(26, "_long_decimal", Types.DecimalType.of(14, 2)),
-          optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2)));
+          optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2)),
+          optional(28, "_incompatible-name", Types.DecimalType.of(8, 2)));
 
   private static final String TOO_LONG_FOR_STATS;
 
@@ -193,6 +195,7 @@ public class TestBloomRowGroupFilter {
 
     // build struct field schema
     org.apache.avro.Schema structSchema = 
AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE);
+    String compatibleFieldName = "_incompatible_x2Dname";
 
     OutputFile outFile = Files.localOutput(temp);
     try (FileAppender<Record> appender =
@@ -224,6 +227,7 @@ public class TestBloomRowGroupFilter {
             .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int_decimal", 
"true")
             .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long_decimal", 
"true")
             .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + 
"_fixed_decimal", "true")
+            .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + 
"_incompatible-name", "true")
             .build()) {
       GenericRecordBuilder builder = new 
GenericRecordBuilder(convert(FILE_SCHEMA, "table"));
       // create 50 records
@@ -259,6 +263,7 @@ public class TestBloomRowGroupFilter {
         builder.set("_int_decimal", new BigDecimal(String.valueOf(77.77 + i)));
         builder.set("_long_decimal", new BigDecimal(String.valueOf(88.88 + 
i)));
         builder.set("_fixed_decimal", new BigDecimal(String.valueOf(99.99 + 
i)));
+        builder.set(compatibleFieldName, new BigDecimal(String.valueOf(77.77 + 
i)));
 
         appender.add(builder.build());
       }
@@ -683,7 +688,7 @@ public class TestBloomRowGroupFilter {
   }
 
   @Test
-  public void testIntDeciamlEq() {
+  public void testIntDecimalEq() {
     for (int i = 0; i < INT_VALUE_COUNT; i++) {
       boolean shouldRead =
           new ParquetBloomRowGroupFilter(
@@ -699,7 +704,7 @@ public class TestBloomRowGroupFilter {
   }
 
   @Test
-  public void testLongDeciamlEq() {
+  public void testLongDecimalEq() {
     for (int i = 0; i < INT_VALUE_COUNT; i++) {
       boolean shouldRead =
           new ParquetBloomRowGroupFilter(
@@ -715,7 +720,7 @@ public class TestBloomRowGroupFilter {
   }
 
   @Test
-  public void testFixedDeciamlEq() {
+  public void testFixedDecimalEq() {
     for (int i = 0; i < INT_VALUE_COUNT; i++) {
       boolean shouldRead =
           new ParquetBloomRowGroupFilter(
@@ -1189,4 +1194,21 @@ public class TestBloomRowGroupFilter {
         .as("Should read: filter contains non-reference evaluate as True")
         .isTrue();
   }
+
+  @Test
+  public void testIncompatibleColumnNameEq() {
+    for (int i = 0; i < INT_VALUE_COUNT; i++) {
+      boolean shouldRead =
+          new ParquetBloomRowGroupFilter(
+                  SCHEMA, equal("incompatible-name", new 
BigDecimal(String.valueOf(77.77 + i))))
+              .shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
+      assertThat(shouldRead).as("Should read: decimal within range").isTrue();
+    }
+
+    boolean shouldRead =
+        new ParquetBloomRowGroupFilter(
+                SCHEMA, equal("incompatible-name", new BigDecimal("1234.56")))
+            .shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
+    assertThat(shouldRead).as("Should not read: decimal outside 
range").isFalse();
+  }
 }

Reply via email to