hantmac opened a new issue, #9615: URL: https://github.com/apache/seatunnel/issues/9615
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description ## High-Performance CDC Support for Databend Sink Connector ### Background Databend is an OLAP database optimized for analytical workloads with columnar storage. When dealing with CDC (Change Data Capture) scenarios, performing individual UPDATE and DELETE operations significantly impacts performance and doesn't leverage Databend's strengths in batch processing. Currently, the Databend sink connector in SeaTunnel supports batch INSERT operations, but lacks efficient handling of UPDATE and DELETE operations in CDC scenarios. ### Proposed Solution I propose implementing a high-performance CDC mode for the Databend sink connector using Databend's Stream and MERGE INTO capabilities. #### Architecture Overview 1. **CDC Mode Activation**: When users specify `conflict_key` in the configuration, the connector switches to CDC mode. 2. **Raw Table Design**: Create a raw staging table with the following schema: ```sql CREATE TABLE cdc_raw_table ( id VARCHAR(255), table_name VARCHAR(255), raw_data JSON, add_time TIMESTAMP, action STRING ) ``` 3. **Stream Creation**: Create a Databend Stream to monitor changes on the raw table: ```sql CREATE STREAM cdc_stream ON TABLE cdc_raw_table ``` 4. **Two-Phase Processing**: - **Phase 1**: SeaTunnel writes all CDC events (INSERT/UPDATE/DELETE) to the raw table as JSON - **Phase 2**: A background thread periodically executes MERGE INTO operations 5. **Merge Logic**: ```sql MERGE INTO target_table a USING ( SELECT raw_data:column1 as column1, raw_data:column2 as column2, raw_data:column3 as column3, action FROM cdc_stream QUALIFY ROW_NUMBER() OVER(PARTITION BY id ORDER BY add_time DESC) = 1 ) b ON a.id = b.id WHEN MATCHED AND b.action = 'update' THEN UPDATE * WHEN MATCHED AND b.action = 'delete' THEN DELETE WHEN NOT MATCHED AND b.action != 'delete' THEN INSERT * ``` ### Benefits 1. **Performance**: Batch processing of CDC events leverages Databend's columnar storage optimization 2. **Efficiency**: Reduces network round trips and transaction overhead 3. **Scalability**: Handles high-volume CDC workloads effectively 4. **Consistency**: Stream ensures no data loss and maintains order of operations ### Configuration Example ```hocon sink { Databend { url = "databend://localhost:8000" username = "user" password = "password" database = "mydb" table = "target_table" # Enable CDC mode conflict_key = "id" # CDC specific configurations cdc { enable = true raw_table = "cdc_raw_table" stream_name = "cdc_stream" merge_interval_seconds = 30 merge_batch_size = 10000 } } } ``` ### Implementation Considerations 1. **Schema Evolution**: Handle schema changes between source and target 2. **Error Handling**: Implement retry logic for failed merge operations 3. **Monitoring**: Expose metrics for CDC lag and processing rate 4. **Cleanup**: Automatic cleanup of processed records from raw table ### Questions for Discussion 1. Should we support multiple target tables with a single raw table? 2. What's the best strategy for handling very large transactions? 3. Should we provide an option to fall back to direct DML for low-volume scenarios? 4. How should we handle concurrent merge operations? ### References - [[Databend MERGE INTO Documentation](https://docs.databend.com/sql/sql-commands/dml/dml-merge)](https://docs.databend.com/sql/sql-commands/dml/dml-merge) - [[Databend Streams Documentation](https://docs.databend.com/sql/sql-commands/ddl/stream/)](https://docs.databend.com/sql/sql-commands/ddl/stream/) I'd love to hear the community's thoughts on this approach and any suggestions for improvement. ### Usage Scenario _No response_ ### Related issues _No response_ ### Are you willing to submit a PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
