jalpan-randeri opened a new pull request, #16210:
URL: https://github.com/apache/iceberg/pull/16210

   ## Overview
   
   This PR introduces the core Iceberg-side support for predicate pushdown 
within Spark Structured Streaming.
   
   Currently, Spark streaming workloads on Iceberg tables lack the pruning 
efficiencies available in batch scans. This change bridges that gap by enabling 
the SparkScan to propagate filter expressions to the MicroBatchStream. By 
moving filter evaluation from the Spark executor level to the Iceberg metadata 
level (during partition planning), we significantly reduce:
   
   1. **I/O Overhead**: Fewer manifests and data files are scanned.
   2. **Driver Memory Pressure**: Reduced metadata processing during task 
planning.
   3. **Compute Costs**: Lower scheduling overhead for micro-batches on 
high-cardinality partitioned tables.
   
   > Note on Dependencies: This commit provides the necessary interface and 
implementation in the Iceberg connector. To fully leverage this, a 
corresponding PR will be raised in the Apache Spark repository to ensure the 
engine correctly passes these predicates to the V2 streaming source.
   
   Fixes https://github.com/apache/iceberg/issues/15692
   
   ## Testing & Verification
   
   To validate the end-to-end flow of predicate propagation—from the Spark 
Dataset definition down to the Iceberg file planning—I performed a trace 
analysis using an instrumented local build.
   
   ### 1. Test Environment & Setup
   
   A streaming query was initialized on a partitioned table with the following 
schema:
   
   ```Scala
   val streamingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = 
[id: bigint, part: string]
   val query = streamingDF.filter("part = 
'active'").writeStream.format("console").start()
   ```
   
   ### 2. Execution Trace Analysis
   
   The following logs capture the lifecycle of the predicate. Note the [jalpan] 
identifiers, which trace the filter [ref(name="part") == "active"] as it is 
handed off from the SparkScan to the MicroBatchStream and utilized during the 
critical planning phase:
   
   ```Plaintext
   // Initialization: SparkScan captures the intent to stream
   26/05/03 20:32:57 ERROR SparkScan: [jalpan] creating micro batch stream
   
   // Hand-off: Predicate is successfully threaded into the MicroBatchStream
   26/05/03 20:32:57 ERROR SparkMicroBatchStream: [jalpan] creating micro batch 
with filter [ref(name="part") == "active"]
   
   // Execution: Predicate is applied during the Planning Phase to prune files
   26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   26/05/03 20:32:58 ERROR SparkMicroBatchStream: [jalpan] planning input 
partitions micro batch with filter [ref(name="part") == "active"]
   ...
   ```
   
   ### 3. Correctness Validation
   
   The output confirms that the pushdown was not only syntactically correct but 
functionally accurate, returning only the records matching the pushed-down 
predicate:
   
   ```Plaintext
   -------------------------------------------
   Batch: 0
   -------------------------------------------
   +---+------+
   | id|  part|
   +---+------+
   |  1|active|
   +---+------+
   ```
   
   ### Conclusion of Testing
   
   The trace confirms that the MicroBatchStream is now "predicate-aware." By 
having the filter present during the planning input partitions phase, the 
Iceberg ManifestGroup can successfully exclude non-matching files before Spark 
even begins task execution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to