This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 3dbb5cc429 Parquet: Use compatible column name to set Parquet bloom
filter (#11799)
3dbb5cc429 is described below
commit 3dbb5cc429e1a52555dddbac62a6087bd651cc5c
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Jan 10 07:22:02 2025 -0800
Parquet: Use compatible column name to set Parquet bloom filter (#11799)
---
.../java/org/apache/iceberg/parquet/Parquet.java | 74 +++++++++++++++-------
.../iceberg/parquet/TestBloomRowGroupFilter.java | 32 ++++++++--
2 files changed, 77 insertions(+), 29 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index a3a5931832..310435209b 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -55,6 +55,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -95,6 +96,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PropertyUtil;
@@ -115,8 +117,12 @@ import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Parquet {
+ private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);
+
private Parquet() {}
private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
@@ -266,6 +272,43 @@ public class Parquet {
return this;
}
+ private <T> void setBloomFilterConfig(
+ Context context,
+ MessageType parquetSchema,
+ BiConsumer<String, Boolean> withBloomFilterEnabled,
+ BiConsumer<String, Double> withBloomFilterFPP) {
+
+ Map<Integer, String> fieldIdToParquetPath =
+ parquetSchema.getColumns().stream()
+ .collect(
+ Collectors.toMap(
+ col -> col.getPrimitiveType().getId().intValue(),
+ col -> String.join(".", col.getPath())));
+
+ context
+ .columnBloomFilterEnabled()
+ .forEach(
+ (colPath, isEnabled) -> {
+ Types.NestedField fieldId = schema.findField(colPath);
+ if (fieldId == null) {
+ LOG.warn("Skipping bloom filter config for missing field:
{}", colPath);
+ return;
+ }
+
+ String parquetColumnPath =
fieldIdToParquetPath.get(fieldId.fieldId());
+ if (parquetColumnPath == null) {
+ LOG.warn("Skipping bloom filter config for missing field:
{}", fieldId);
+ return;
+ }
+
+ withBloomFilterEnabled.accept(parquetColumnPath,
Boolean.valueOf(isEnabled));
+ String fpp = context.columnBloomFilterFpp().get(colPath);
+ if (fpp != null) {
+ withBloomFilterFPP.accept(parquetColumnPath,
Double.parseDouble(fpp));
+ }
+ });
+ }
+
public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be
null");
@@ -285,8 +328,6 @@ public class Parquet {
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
- Map<String, String> columnBloomFilterFpp =
context.columnBloomFilterFpp();
- Map<String, String> columnBloomFilterEnabled =
context.columnBloomFilterEnabled();
boolean dictionaryEnabled = context.dictionaryEnabled();
if (compressionLevel != null) {
@@ -343,17 +384,8 @@ public class Parquet {
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
.withMaxBloomFilterBytes(bloomFilterMaxBytes);
- for (Map.Entry<String, String> entry :
columnBloomFilterEnabled.entrySet()) {
- String colPath = entry.getKey();
- String bloomEnabled = entry.getValue();
- propsBuilder.withBloomFilterEnabled(colPath,
Boolean.parseBoolean(bloomEnabled));
- }
-
- for (Map.Entry<String, String> entry :
columnBloomFilterFpp.entrySet()) {
- String colPath = entry.getKey();
- String fpp = entry.getValue();
- propsBuilder.withBloomFilterFPP(colPath, Double.parseDouble(fpp));
- }
+ setBloomFilterConfig(
+ context, type, propsBuilder::withBloomFilterEnabled,
propsBuilder::withBloomFilterFPP);
ParquetProperties parquetProperties = propsBuilder.build();
@@ -386,17 +418,11 @@ public class Parquet {
.withDictionaryPageSize(dictionaryPageSize)
.withEncryption(fileEncryptionProperties);
- for (Map.Entry<String, String> entry :
columnBloomFilterEnabled.entrySet()) {
- String colPath = entry.getKey();
- String bloomEnabled = entry.getValue();
- parquetWriteBuilder.withBloomFilterEnabled(colPath,
Boolean.parseBoolean(bloomEnabled));
- }
-
- for (Map.Entry<String, String> entry :
columnBloomFilterFpp.entrySet()) {
- String colPath = entry.getKey();
- String fpp = entry.getValue();
- parquetWriteBuilder.withBloomFilterFPP(colPath,
Double.parseDouble(fpp));
- }
+ setBloomFilterConfig(
+ context,
+ type,
+ parquetWriteBuilder::withBloomFilterEnabled,
+ parquetWriteBuilder::withBloomFilterFPP);
return new ParquetWriteAdapter<>(parquetWriteBuilder.build(),
metricsConfig);
}
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
index 62f330f9f5..bfa511c912 100644
---
a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
+++
b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
@@ -111,7 +111,8 @@ public class TestBloomRowGroupFilter {
optional(24, "binary", Types.BinaryType.get()),
optional(25, "int_decimal", Types.DecimalType.of(8, 2)),
optional(26, "long_decimal", Types.DecimalType.of(14, 2)),
- optional(27, "fixed_decimal", Types.DecimalType.of(31, 2)));
+ optional(27, "fixed_decimal", Types.DecimalType.of(31, 2)),
+ optional(28, "incompatible-name", Types.DecimalType.of(8, 2)));
private static final Types.StructType UNDERSCORE_STRUCT_FIELD_TYPE =
Types.StructType.of(Types.NestedField.required(16, "_int_field",
IntegerType.get()));
@@ -142,7 +143,8 @@ public class TestBloomRowGroupFilter {
optional(24, "_binary", Types.BinaryType.get()),
optional(25, "_int_decimal", Types.DecimalType.of(8, 2)),
optional(26, "_long_decimal", Types.DecimalType.of(14, 2)),
- optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2)));
+ optional(27, "_fixed_decimal", Types.DecimalType.of(31, 2)),
+ optional(28, "_incompatible-name", Types.DecimalType.of(8, 2)));
private static final String TOO_LONG_FOR_STATS;
@@ -193,6 +195,7 @@ public class TestBloomRowGroupFilter {
// build struct field schema
org.apache.avro.Schema structSchema =
AvroSchemaUtil.convert(UNDERSCORE_STRUCT_FIELD_TYPE);
+ String compatibleFieldName = "_incompatible_x2Dname";
OutputFile outFile = Files.localOutput(temp);
try (FileAppender<Record> appender =
@@ -224,6 +227,7 @@ public class TestBloomRowGroupFilter {
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_int_decimal",
"true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long_decimal",
"true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX +
"_fixed_decimal", "true")
+ .set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX +
"_incompatible-name", "true")
.build()) {
GenericRecordBuilder builder = new
GenericRecordBuilder(convert(FILE_SCHEMA, "table"));
// create 50 records
@@ -259,6 +263,7 @@ public class TestBloomRowGroupFilter {
builder.set("_int_decimal", new BigDecimal(String.valueOf(77.77 + i)));
builder.set("_long_decimal", new BigDecimal(String.valueOf(88.88 +
i)));
builder.set("_fixed_decimal", new BigDecimal(String.valueOf(99.99 +
i)));
+ builder.set(compatibleFieldName, new BigDecimal(String.valueOf(77.77 +
i)));
appender.add(builder.build());
}
@@ -683,7 +688,7 @@ public class TestBloomRowGroupFilter {
}
@Test
- public void testIntDeciamlEq() {
+ public void testIntDecimalEq() {
for (int i = 0; i < INT_VALUE_COUNT; i++) {
boolean shouldRead =
new ParquetBloomRowGroupFilter(
@@ -699,7 +704,7 @@ public class TestBloomRowGroupFilter {
}
@Test
- public void testLongDeciamlEq() {
+ public void testLongDecimalEq() {
for (int i = 0; i < INT_VALUE_COUNT; i++) {
boolean shouldRead =
new ParquetBloomRowGroupFilter(
@@ -715,7 +720,7 @@ public class TestBloomRowGroupFilter {
}
@Test
- public void testFixedDeciamlEq() {
+ public void testFixedDecimalEq() {
for (int i = 0; i < INT_VALUE_COUNT; i++) {
boolean shouldRead =
new ParquetBloomRowGroupFilter(
@@ -1189,4 +1194,21 @@ public class TestBloomRowGroupFilter {
.as("Should read: filter contains non-reference evaluate as True")
.isTrue();
}
+
+ @Test
+ public void testIncompatibleColumnNameEq() {
+ for (int i = 0; i < INT_VALUE_COUNT; i++) {
+ boolean shouldRead =
+ new ParquetBloomRowGroupFilter(
+ SCHEMA, equal("incompatible-name", new
BigDecimal(String.valueOf(77.77 + i))))
+ .shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
+ assertThat(shouldRead).as("Should read: decimal within range").isTrue();
+ }
+
+ boolean shouldRead =
+ new ParquetBloomRowGroupFilter(
+ SCHEMA, equal("incompatible-name", new BigDecimal("1234.56")))
+ .shouldRead(parquetSchema, rowGroupMetadata, bloomStore);
+ assertThat(shouldRead).as("Should not read: decimal outside
range").isFalse();
+ }
}