jalpan-randeri opened a new pull request, #16210: URL: https://github.com/apache/iceberg/pull/16210
## Overview This PR introduces the core Iceberg-side support for predicate pushdown within Spark Structured Streaming. Currently, Spark streaming workloads on Iceberg tables lack the pruning efficiencies available in batch scans. This change bridges that gap by enabling the SparkScan to propagate filter expressions to the MicroBatchStream. By moving filter evaluation from the Spark executor level to the Iceberg metadata level (during partition planning), we significantly reduce: 1. **I/O Overhead**: Fewer manifests and data files are scanned. 2. **Driver Memory Pressure**: Reduced metadata processing during task planning. 3. **Compute Costs**: Lower scheduling overhead for micro-batches on high-cardinality partitioned tables. > Note on Dependencies: This commit provides the necessary interface and implementation in the Iceberg connector. To fully leverage this, a corresponding PR will be raised in the Apache Spark repository to ensure the engine correctly passes these predicates to the V2 streaming source. Fixes https://github.com/apache/iceberg/issues/15692 ## Testing & Verification To validate the end-to-end flow of predicate propagation—from the Spark Dataset definition down to the Iceberg file planning—I performed a trace analysis using an instrumented local build. ### 1. Test Environment & Setup A streaming query was initialized on a partitioned table with the following schema: ```Scala val streamingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, part: string] val query = streamingDF.filter("part = 'active'").writeStream.format("console").start() ``` ### 2. Execution Trace Analysis The following logs capture the lifecycle of the predicate. Note the [jalpan] identifiers, which trace the filter [ref(name="part") == "active"] as it is handed off from the SparkScan to the MicroBatchStream and utilized during the critical planning phase: ```Plaintext // Initialization: SparkScan captures the intent to stream 26/05/03 20:32:57 ERROR SparkScan: [jalpan] creating micro batch stream // Hand-off: Predicate is successfully threaded into the MicroBatchStream 26/05/03 20:32:57 ERROR SparkMicroBatchStream: [jalpan] creating micro batch with filter [ref(name="part") == "active"] // Execution: Predicate is applied during the Planning Phase to prune files 26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"] 26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input partitions micro batch with filter [ref(name="part") == "active"] ... ``` ### 3. Correctness Validation The output confirms that the pushdown was not only syntactically correct but functionally accurate, returning only the records matching the pushed-down predicate: ```Plaintext ------------------------------------------- Batch: 0 ------------------------------------------- +---+------+ | id| part| +---+------+ | 1|active| +---+------+ ``` ### Conclusion of Testing The trace confirms that the MicroBatchStream is now "predicate-aware." By having the filter present during the planning input partitions phase, the Iceberg ManifestGroup can successfully exclude non-matching files before Spark even begins task execution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
