hantmac opened a new issue, #9615:
URL: https://github.com/apache/seatunnel/issues/9615

   ### Search before asking
   
   - [x] I had searched in the 
[feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22)
 and found no similar feature requirement.
   
   
   ### Description
   
   
   ## High-Performance CDC Support for Databend Sink Connector
   
   ### Background
   
   Databend is an OLAP database optimized for analytical workloads with 
columnar storage. When dealing with CDC (Change Data Capture) scenarios, 
performing individual UPDATE and DELETE operations significantly impacts 
performance and doesn't leverage Databend's strengths in batch processing.
   
   Currently, the Databend sink connector in SeaTunnel supports batch INSERT 
operations, but lacks efficient handling of UPDATE and DELETE operations in CDC 
scenarios.
   
   ### Proposed Solution
   
   I propose implementing a high-performance CDC mode for the Databend sink 
connector using Databend's Stream and MERGE INTO capabilities.
   
   #### Architecture Overview
   
   1. **CDC Mode Activation**: When users specify `conflict_key` in the 
configuration, the connector switches to CDC mode.
   
   2. **Raw Table Design**: Create a raw staging table with the following 
schema:
      ```sql
      CREATE TABLE cdc_raw_table (
          id VARCHAR(255),
          table_name VARCHAR(255),
          raw_data JSON,
          add_time TIMESTAMP,
          action STRING
      )
      ```
   
   3. **Stream Creation**: Create a Databend Stream to monitor changes on the 
raw table:
      ```sql
      CREATE STREAM cdc_stream ON TABLE cdc_raw_table
      ```
   
   4. **Two-Phase Processing**:
      - **Phase 1**: SeaTunnel writes all CDC events (INSERT/UPDATE/DELETE) to 
the raw table as JSON
      - **Phase 2**: A background thread periodically executes MERGE INTO 
operations
   
   5. **Merge Logic**:
      ```sql
      MERGE INTO target_table a 
      USING (
          SELECT 
              raw_data:column1 as column1,
              raw_data:column2 as column2,
              raw_data:column3 as column3,
              action 
          FROM cdc_stream 
          QUALIFY ROW_NUMBER() OVER(PARTITION BY id ORDER BY add_time DESC) = 1
      ) b 
      ON a.id = b.id
      WHEN MATCHED AND b.action = 'update' THEN UPDATE *
      WHEN MATCHED AND b.action = 'delete' THEN DELETE
      WHEN NOT MATCHED AND b.action != 'delete' THEN INSERT *
      ```
   
   ### Benefits
   
   1. **Performance**: Batch processing of CDC events leverages Databend's 
columnar storage optimization
   2. **Efficiency**: Reduces network round trips and transaction overhead
   3. **Scalability**: Handles high-volume CDC workloads effectively
   4. **Consistency**: Stream ensures no data loss and maintains order of 
operations
   
   ### Configuration Example
   
   ```hocon
   sink {
     Databend {
       url = "databend://localhost:8000"
       username = "user"
       password = "password"
       database = "mydb"
       table = "target_table"
       
       # Enable CDC mode
       conflict_key = "id"
       
       # CDC specific configurations
       cdc {
         enable = true
         raw_table = "cdc_raw_table"
         stream_name = "cdc_stream"
         merge_interval_seconds = 30
         merge_batch_size = 10000
       }
     }
   }
   ```
   
   ### Implementation Considerations
   
   1. **Schema Evolution**: Handle schema changes between source and target
   2. **Error Handling**: Implement retry logic for failed merge operations
   3. **Monitoring**: Expose metrics for CDC lag and processing rate
   4. **Cleanup**: Automatic cleanup of processed records from raw table
   
   ### Questions for Discussion
   
   1. Should we support multiple target tables with a single raw table?
   2. What's the best strategy for handling very large transactions?
   3. Should we provide an option to fall back to direct DML for low-volume 
scenarios?
   4. How should we handle concurrent merge operations?
   
   ### References
   
   - [[Databend MERGE INTO 
Documentation](https://docs.databend.com/sql/sql-commands/dml/dml-merge)](https://docs.databend.com/sql/sql-commands/dml/dml-merge)
   - [[Databend Streams 
Documentation](https://docs.databend.com/sql/sql-commands/ddl/stream/)](https://docs.databend.com/sql/sql-commands/ddl/stream/)
   
   I'd love to hear the community's thoughts on this approach and any 
suggestions for improvement.
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to