This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d04be9b6df Add table property to disable/enable parquet column
statistics #12770 (#12771)
d04be9b6df is described below
commit d04be9b6df0e4bc59bceb6a80c2b828d5318d270
Author: huaxiangsun <[email protected]>
AuthorDate: Mon May 12 16:06:22 2025 -0700
Add table property to disable/enable parquet column statistics #12770
(#12771)
* add table properties to disable/enable parquet column statistics
* Address review comments
1). Moved unitest to parquet module.
2). Added documentation for new table properties.
* Address more review comments
* Addess review comments about documentation
* 1. Removed default config for column statistics, wait until the parquet
release with fix is updated in Iceberg release.
2. Addressed the comments to remove fieldIdToParquetPath map, replaced with
columnNameToParquetPath map.
3. Added missing code for schema update of the new parquet column
statistics enable config.
* refactor code
---------
Co-authored-by: Huaxiang Sun <[email protected]>
---
.../main/java/org/apache/iceberg/SchemaUpdate.java | 3 +-
.../java/org/apache/iceberg/TableProperties.java | 3 +
.../apache/iceberg/TestSchemaAndMappingUpdate.java | 15 +++++
docs/docs/configuration.md | 1 +
.../java/org/apache/iceberg/parquet/Parquet.java | 77 ++++++++++++++++------
.../org/apache/iceberg/parquet/TestParquet.java | 46 +++++++++++++
6 files changed, 124 insertions(+), 21 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
index db02a0e96e..8f2bfe184c 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaUpdate.java
@@ -516,7 +516,8 @@ class SchemaUpdate implements UpdateSchema {
Set<String> columnProperties =
ImmutableSet.of(
TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX,
- TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
+ TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX,
+ TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX);
Map<String, String> updatedProperties =
PropertyUtil.applySchemaChanges(
newMetadata.properties(), deletedColumns, renamedColumns,
columnProperties);
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 577ce653e0..812c0543b4 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -174,6 +174,9 @@ public class TableProperties {
public static final String PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX =
"write.parquet.bloom-filter-enabled.column.";
+ public static final String PARQUET_COLUMN_STATS_ENABLED_PREFIX =
+ "write.parquet.stats-enabled.column.";
+
public static final String AVRO_COMPRESSION = "write.avro.compression-codec";
public static final String DELETE_AVRO_COMPRESSION =
"write.delete.avro.compression-codec";
public static final String AVRO_COMPRESSION_DEFAULT = "gzip";
diff --git
a/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java
b/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java
index dc6e9e1a7f..84f1a2ec90 100644
--- a/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java
+++ b/core/src/test/java/org/apache/iceberg/TestSchemaAndMappingUpdate.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static
org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -191,6 +192,20 @@ public class TestSchemaAndMappingUpdate extends TestBase {
table.properties().get(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX
+ "ID"));
}
+ @TestTemplate
+ public void testModificationWithParquetColumnStats() {
+ table.updateProperties().set(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id",
"true").commit();
+
+ table.updateSchema().renameColumn("id", "ID").commit();
+ assertThat(table.properties())
+ .containsEntry(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "ID", "true")
+ .doesNotContainKey(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "id");
+
+ table.updateSchema().deleteColumn("ID").commit();
+ assertThat(table.properties())
+
.doesNotContainKey(table.properties().get(PARQUET_COLUMN_STATS_ENABLED_PREFIX +
"ID"));
+ }
+
@TestTemplate
public void testDeleteAndAddColumnReassign() {
NameMapping mapping = MappingUtil.create(table.schema());
diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md
index c784566ef5..2c9f6608cb 100644
--- a/docs/docs/configuration.md
+++ b/docs/docs/configuration.md
@@ -52,6 +52,7 @@ Iceberg tables support table properties to configure table
behavior, like the de
| write.parquet.bloom-filter-enabled.column.col1 | (not set)
| Hint to parquet to write a bloom filter for the column: 'col1'
|
| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB)
| The maximum number of bytes for a bloom filter bitset
|
| write.parquet.bloom-filter-fpp.column.col1 | 0.01
| The false positive probability for a bloom filter applied to 'col1'
(must > 0.0 and < 1.0)
|
+| write.parquet.stats-enabled.column.col1 | (not set)
| Controls whether to collect parquet column statistics for column 'col1'
|
| write.avro.compression-codec | gzip
| Avro compression codec: gzip(deflate with 9 level), zstd, snappy,
uncompressed
|
| write.avro.compression-level | null
| Avro compression level
|
| write.orc.stripe-size-bytes | 67108864 (64 MB)
| Define the default ORC stripe size, in bytes
|
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 31f9e2a80a..6f68fbe150 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -30,6 +30,7 @@ import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENA
import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
+import static
org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
@@ -98,7 +99,6 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.PropertyUtil;
@@ -120,6 +120,7 @@ import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type.ID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -308,31 +309,17 @@ public class Parquet {
private void setBloomFilterConfig(
Context context,
- MessageType parquetSchema,
+ Map<String, String> colNameToParquetPathMap,
BiConsumer<String, Boolean> withBloomFilterEnabled,
BiConsumer<String, Double> withBloomFilterFPP) {
- Map<Integer, String> fieldIdToParquetPath =
- parquetSchema.getColumns().stream()
- .filter(col -> col.getPrimitiveType().getId() != null)
- .collect(
- Collectors.toMap(
- col -> col.getPrimitiveType().getId().intValue(),
- col -> String.join(".", col.getPath())));
-
context
.columnBloomFilterEnabled()
.forEach(
(colPath, isEnabled) -> {
- Types.NestedField fieldId = schema.findField(colPath);
- if (fieldId == null) {
- LOG.warn("Skipping bloom filter config for missing field:
{}", colPath);
- return;
- }
-
- String parquetColumnPath =
fieldIdToParquetPath.get(fieldId.fieldId());
+ String parquetColumnPath =
colNameToParquetPathMap.get(colPath);
if (parquetColumnPath == null) {
- LOG.warn("Skipping bloom filter config for missing field:
{}", fieldId);
+ LOG.warn("Skipping bloom filter config for missing field:
{}", colPath);
return;
}
@@ -344,6 +331,24 @@ public class Parquet {
});
}
+ private void setColumnStatsConfig(
+ Context context,
+ Map<String, String> colNameToParquetPathMap,
+ BiConsumer<String, Boolean> withColumnStatsEnabled) {
+
+ context
+ .columnStatsEnabled()
+ .forEach(
+ (colPath, isEnabled) -> {
+ String parquetColumnPath =
colNameToParquetPathMap.get(colPath);
+ if (parquetColumnPath == null) {
+ LOG.warn("Skipping column statistics config for missing
field: {}", colPath);
+ return;
+ }
+ withColumnStatsEnabled.accept(parquetColumnPath,
Boolean.valueOf(isEnabled));
+ });
+ }
+
@Override
public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
@@ -401,6 +406,18 @@ public class Parquet {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with
null encryption key");
}
+ Map<String, String> colNameToParquetPathMap =
+ type.getColumns().stream()
+ .filter(
+ col -> {
+ ID id = col.getPrimitiveType().getId();
+ return (id != null) &&
(schema.findColumnName(id.intValue()) != null);
+ })
+ .collect(
+ Collectors.toMap(
+ col ->
schema.findColumnName(col.getPrimitiveType().getId().intValue()),
+ col -> String.join(".", col.getPath())));
+
if (createWriterFunc != null) {
Preconditions.checkArgument(
writeSupport == null, "Cannot write with both write support and
Parquet value writer");
@@ -421,7 +438,12 @@ public class Parquet {
.withMaxBloomFilterBytes(bloomFilterMaxBytes);
setBloomFilterConfig(
- context, type, propsBuilder::withBloomFilterEnabled,
propsBuilder::withBloomFilterFPP);
+ context,
+ colNameToParquetPathMap,
+ propsBuilder::withBloomFilterEnabled,
+ propsBuilder::withBloomFilterFPP);
+
+ setColumnStatsConfig(context, colNameToParquetPathMap,
propsBuilder::withStatisticsEnabled);
ParquetProperties parquetProperties = propsBuilder.build();
@@ -457,10 +479,13 @@ public class Parquet {
setBloomFilterConfig(
context,
- type,
+ colNameToParquetPathMap,
parquetWriteBuilder::withBloomFilterEnabled,
parquetWriteBuilder::withBloomFilterFPP);
+ setColumnStatsConfig(
+ context, colNameToParquetPathMap,
parquetWriteBuilder::withStatisticsEnabled);
+
return new ParquetWriteAdapter<>(parquetWriteBuilder.build(),
metricsConfig);
}
}
@@ -477,6 +502,7 @@ public class Parquet {
private final int bloomFilterMaxBytes;
private final Map<String, String> columnBloomFilterFpp;
private final Map<String, String> columnBloomFilterEnabled;
+ private final Map<String, String> columnStatsEnabled;
private final boolean dictionaryEnabled;
private Context(
@@ -491,6 +517,7 @@ public class Parquet {
int bloomFilterMaxBytes,
Map<String, String> columnBloomFilterFpp,
Map<String, String> columnBloomFilterEnabled,
+ Map<String, String> columnStatsEnabled,
boolean dictionaryEnabled) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
@@ -503,6 +530,7 @@ public class Parquet {
this.bloomFilterMaxBytes = bloomFilterMaxBytes;
this.columnBloomFilterFpp = columnBloomFilterFpp;
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
+ this.columnStatsEnabled = columnStatsEnabled;
this.dictionaryEnabled = dictionaryEnabled;
}
@@ -564,6 +592,9 @@ public class Parquet {
Map<String, String> columnBloomFilterEnabled =
PropertyUtil.propertiesWithPrefix(config,
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
+ Map<String, String> columnStatsEnabled =
+ PropertyUtil.propertiesWithPrefix(config,
PARQUET_COLUMN_STATS_ENABLED_PREFIX);
+
boolean dictionaryEnabled =
PropertyUtil.propertyAsBoolean(config,
ParquetOutputFormat.ENABLE_DICTIONARY, true);
@@ -579,6 +610,7 @@ public class Parquet {
bloomFilterMaxBytes,
columnBloomFilterFpp,
columnBloomFilterEnabled,
+ columnStatsEnabled,
dictionaryEnabled);
}
@@ -647,6 +679,7 @@ public class Parquet {
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT,
ImmutableMap.of(),
ImmutableMap.of(),
+ ImmutableMap.of(),
dictionaryEnabled);
}
@@ -702,6 +735,10 @@ public class Parquet {
return columnBloomFilterEnabled;
}
+ Map<String, String> columnStatsEnabled() {
+ return columnStatsEnabled;
+ }
+
boolean dictionaryEnabled() {
return dictionaryEnabled;
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
index ae0a822d34..5eabeb02d3 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.parquet;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.iceberg.Files.localInput;
+import static
org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
@@ -57,6 +58,7 @@ import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -219,6 +221,50 @@ public class TestParquet {
assertThat(recordRead.get("topbytes")).isEqualTo(expectedBinary);
}
+ @Test
+ public void testColumnStatisticsEnabled() throws Exception {
+ Schema schema =
+ new Schema(
+ optional(1, "int_field", IntegerType.get()),
+ optional(2, "string_field", Types.StringType.get()));
+
+ File file = createTempFile(temp);
+
+ List<GenericData.Record> records = Lists.newArrayListWithCapacity(5);
+ org.apache.avro.Schema avroSchema =
AvroSchemaUtil.convert(schema.asStruct());
+ for (int i = 1; i <= 5; i++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put("int_field", i);
+ record.put("string_field", "test");
+ records.add(record);
+ }
+
+ write(
+ file,
+ schema,
+ ImmutableMap.<String, String>builder()
+ .put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "int_field", "true")
+ .put(PARQUET_COLUMN_STATS_ENABLED_PREFIX + "string_field", "false")
+ .buildOrThrow(),
+ ParquetAvroWriter::buildWriter,
+ records.toArray(new GenericData.Record[] {}));
+
+ InputFile inputFile = Files.localInput(file);
+
+ try (ParquetFileReader reader =
ParquetFileReader.open(ParquetIO.file(inputFile))) {
+ for (BlockMetaData block : reader.getFooter().getBlocks()) {
+ for (ColumnChunkMetaData column : block.getColumns()) {
+ boolean emptyStats = column.getStatistics().isEmpty();
+ if (column.getPath().toDotString().equals("int_field")) {
+ assertThat(emptyStats).as("int_field has
statistics").isEqualTo(false);
+ } else if (column.getPath().toDotString().equals("string_field")) {
+ assertThat(emptyStats).as("string_field has
statistics").isEqualTo(true);
+ }
+ }
+ }
+ }
+ }
+
private Pair<File, Long> generateFile(
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
int desiredRecordCount,