raghav-reglobe opened a new pull request, #16363:
URL: https://github.com/apache/iceberg/pull/16363

   ## What changes are proposed in this pull request?
   
   Add a new table property to enable parquet-mr's adaptive bloom filter sizing
   ([PARQUET-2326](https://issues.apache.org/jira/browse/PARQUET-2326)) for 
Iceberg-managed Parquet writes:
   
   - `write.parquet.bloom-filter-adaptive-enabled` (boolean, default `false`)
   
   When enabled, parquet-mr's `ColumnValueCollector.initBloomFilter()` 
constructs an
   `AdaptiveBlockSplitBloomFilter` instead of `BlockSplitBloomFilter`. The 
adaptive variant evaluates
   N candidate filter sizes and picks the smallest that satisfies actual NDV at 
the configured FPP,
   instead of always pre-allocating `bloom-filter-max-bytes`.
   
   ## Why are the changes needed?
   
   Today, when bloom filter is enabled on a column without per-column NDV, 
parquet-mr's
   `ColumnValueCollector.initBloomFilter()` allocates a fixed 
`bloom-filter-max-bytes` buffer:
   
   ```java
   // from ColumnValueCollector
   } else {
     this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, 
maxBloomFilterSize);
   }
   ```
   
   The buffer is then written to disk via 
`ParquetFileWriter.serializeBloomFilters()` regardless of
   how many values were inserted. For low-row-count writes this produces a file 
dominated by an
   empty bloom filter.
   
   Empirical observations from a Spark Structured Streaming + Iceberg pipeline 
(~720 silver
   tables on warm 600s trigger):
   
   | TBLPROPERTIES                                       | File size for 5-row 
write |
   |---|---|
   | no bloom                                            | 902 bytes            
     |
   | `bloom-enabled.col.id=true, max-bytes=4194304`      | 4,201,826 bytes (~4 
MiB)  |
   | **+ adaptive (this PR)**                            | **268,465 bytes 
(~16x reduction)** |
   
   For workloads that produce frequent low-row-count microbatches (CDC 
streaming, frequent commits),
   this is a significant storage and S3 PUT cost reduction.
   
   ## How was this patch tested?
   
   - New unit test `TestParquetAdaptiveBloomFilter` covers both:
     - Adaptive enabled — verifies file is at least 2x smaller than non-adaptive
     - Default behavior — verifies existing (non-adaptive) behavior is 
preserved when
       the property is not set
   - Empirically verified on a production Spark Structured Streaming + Iceberg 
pipeline.
     File sizes dropped 4 MiB → ~268 KiB on streaming microbatch outputs.
   
   ## Backward compatibility
   
   Default value is `false`, so existing tables and writers see no behavior 
change.
   Operators opt in by setting 
`write.parquet.bloom-filter-adaptive-enabled=true`.
   
   ## Scope
   
   This PR modifies the createWriterFunc code path (used by Spark, Flink, and 
other engines for
   data writes). The legacy `ParquetWriteBuilder` fallback path 
(`createWriterFunc == null`) is
   unchanged. If maintainers want adaptive support on the legacy path as well, 
happy to extend in
   a follow-up.
   
   ## Files changed
   
   - `core/src/main/java/org/apache/iceberg/TableProperties.java` — add 1 
constant + default
   - `parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java` — wire the 
property through
     Context + use in WriteBuilder
   - 
`parquet/src/test/java/org/apache/iceberg/parquet/TestParquetAdaptiveBloomFilter.java`
 — new test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to