[ 
https://issues.apache.org/jira/browse/SPARK-55668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-55668:
-----------------------------------
    Description: 
*SPIP Document:* 
[https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?usp=sharing]
h2. *The Problem*

Currently, querying row-level changes (inserts, updates, deletes) from a table 
requires connector-specific syntax. This fragmentation breaks query portability 
across different storage formats and forces each connector to reinvent complex 
post-processing logic:
 * *Delta Lake:* Uses table_changes()
 * *Iceberg:* Uses .changes virtual tables
 * *Hudi:* Relies on custom incremental read options

There is no universal, engine-level standard in Spark to ask "show me what 
changed."
h2. *The Proposal*

This SPIP proposes a unified approach by adding a CHANGES SQL clause and 
corresponding DataFrame/DataStream APIs that work seamlessly across any DSv2 
connector.
h3. *1. Standardized User API*

_SQL:_

 
{code:java}
– Batch: What changed between version 10 and 20?
SELECT * FROM my_table CHANGES FROM VERSION 10 TO VERSION 20;
 
– Streaming: Continuously process changes
CREATE STREAMING TABLE cdc_sink AS
SELECT * FROM STREAM my_table CHANGES FROM VERSION 0;
 
{code}
 

_DataFrame API:_

 
{code:java}
spark.read
  .option("startingVersion", "10")
  .option("endingVersion", "20")
  .changes("my_table")

 {code}
h3. *2. Engine-Level Post Processing*

Under the hood, this proposal introduces a minimal Changelog interface for DSv2 
connectors. Spark's Catalyst optimizer will take over the CDC post-processing, 
including:
 * Filtering out copy-on-write carry-over rows.
 * Deriving pre-image/post-image updates from raw insert/delete pairs.
 * Computing net changes.

This pushes complexity into the engine where it belongs, reducing duplicated 
effort across the ecosystem and ensuring consistent semantics for users.

  was:
*SPIP Document:* 
https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?usp=sharing
h2. *The Problem*

Currently, querying row-level changes (inserts, updates, deletes) from a table 
requires connector-specific syntax. This fragmentation breaks query portability 
across different storage formats and forces each connector to reinvent complex 
post-processing logic:
 * *Delta Lake:* Uses table_changes()
 * *Iceberg:* Uses .changes virtual tables
 * *Hudi:* Relies on custom incremental read options

There is no universal, engine-level standard in Spark to ask "show me what 
changed."
h2. *The Proposal*

This SPIP proposes a unified approach by adding a CHANGES SQL clause and 
corresponding DataFrame/DataStream APIs that work seamlessly across any DSv2 
connector.
h3. *1. Standardized User API*

_SQL:_

```

-- Batch: What changed between version 10 and 20?

SELECT * FROM my_table CHANGES FROM VERSION 10 TO VERSION 20;

 

-- Streaming: Continuously process changes

CREATE STREAMING TABLE cdc_sink AS

SELECT * FROM STREAM my_table CHANGES FROM VERSION 0;

_```_

 

_DataFrame API:_

```

spark.read

  .option("startingVersion", "10")

  .option("endingVersion", "20")

  .changes("my_table")

```
h3. *2. Engine-Level Post Processing* 

Under the hood, this proposal introduces a minimal Changelog interface for DSv2 
connectors. Spark's Catalyst optimizer will take over the CDC post-processing, 
including:
 * Filtering out copy-on-write carry-over rows.
 * Deriving pre-image/post-image updates from raw insert/delete pairs.
 * Computing net changes.

This pushes complexity into the engine where it belongs, reducing duplicated 
effort across the ecosystem and ensuring consistent semantics for users.


> Change Data Capture (CDC) Support
> ---------------------------------
>
>                 Key: SPARK-55668
>                 URL: https://issues.apache.org/jira/browse/SPARK-55668
>             Project: Spark
>          Issue Type: Umbrella
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Gengliang Wang
>            Priority: Major
>              Labels: SPIP
>
> *SPIP Document:* 
> [https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?usp=sharing]
> h2. *The Problem*
> Currently, querying row-level changes (inserts, updates, deletes) from a 
> table requires connector-specific syntax. This fragmentation breaks query 
> portability across different storage formats and forces each connector to 
> reinvent complex post-processing logic:
>  * *Delta Lake:* Uses table_changes()
>  * *Iceberg:* Uses .changes virtual tables
>  * *Hudi:* Relies on custom incremental read options
> There is no universal, engine-level standard in Spark to ask "show me what 
> changed."
> h2. *The Proposal*
> This SPIP proposes a unified approach by adding a CHANGES SQL clause and 
> corresponding DataFrame/DataStream APIs that work seamlessly across any DSv2 
> connector.
> h3. *1. Standardized User API*
> _SQL:_
>  
> {code:java}
> – Batch: What changed between version 10 and 20?
> SELECT * FROM my_table CHANGES FROM VERSION 10 TO VERSION 20;
>  
> – Streaming: Continuously process changes
> CREATE STREAMING TABLE cdc_sink AS
> SELECT * FROM STREAM my_table CHANGES FROM VERSION 0;
>  
> {code}
>  
> _DataFrame API:_
>  
> {code:java}
> spark.read
>   .option("startingVersion", "10")
>   .option("endingVersion", "20")
>   .changes("my_table")
>  {code}
> h3. *2. Engine-Level Post Processing*
> Under the hood, this proposal introduces a minimal Changelog interface for 
> DSv2 connectors. Spark's Catalyst optimizer will take over the CDC 
> post-processing, including:
>  * Filtering out copy-on-write carry-over rows.
>  * Deriving pre-image/post-image updates from raw insert/delete pairs.
>  * Computing net changes.
> This pushes complexity into the engine where it belongs, reducing duplicated 
> effort across the ecosystem and ensuring consistent semantics for users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to