Hi all, I'd like to open a discussion on a new SPIP to introduce Change Data Capture (CDC) support to Apache Spark, targeting the Spark 4.2 release.
- SPIP Document: <https://docs.google.com/document/d/> https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?usp=sharing - JIRA: <https://www.google.com/search?q=https://issues.apache.org/jira/browse/SPARK-> https://issues.apache.org/jira/browse/SPARK-55668 Motivation Currently, querying row-level changes (inserts, updates, deletes) from a table requires connector-specific syntax. This fragmentation breaks query portability across different storage formats and forces each connector to reinvent complex post-processing logic: - Delta Lake: Uses table_changes() - Iceberg: Uses .changes virtual tables - Hudi: Relies on custom incremental read options There is no universal, engine-level standard in Spark to ask "show me what changed." Proposal This SPIP proposes a unified approach by adding a CHANGES SQL clause and corresponding DataFrame/DataStream APIs that work across DSv2 connectors. 1. Standardized User API SQL: -- Batch: What changed between version 10 and 20? SELECT * FROM my_table CHANGES FROM VERSION 10 TO VERSION 20; -- Streaming: Continuously process changes CREATE STREAMING TABLE cdc_sink AS SELECT * FROM STREAM my_table CHANGES FROM VERSION 0; DataFrame API: spark.read .option("startingVersion", "10") .option("endingVersion", "20") .changes("my_table") 2. Engine-Level Post Processing Under the hood, this proposal introduces a minimal Changelog interface for DSv2 connectors. Spark's Catalyst optimizer will take over the CDC post-processing, including: - Filtering out copy-on-write carry-over rows. - Deriving pre-image/post-image updates from raw insert/delete pairs. - Computing net changes. This pushes complexity into the engine where it belongs, reducing duplicated effort across the ecosystem and ensuring consistent semantics for users. Please review the full SPIP for comprehensive design details, the proposed connector API, and deduplication semantics. Feedback and discussion are highly appreciated! Thanks, Gengliang
