Dion Ricky Saputra created FLINK-39438:
------------------------------------------

             Summary: Add configurable sink operation (upsert/append) for 
MaxCompute sink in Flink CDC
                 Key: FLINK-39438
                 URL: https://issues.apache.org/jira/browse/FLINK-39438
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
            Reporter: Dion Ricky Saputra


*Background*
Currently, the MaxCompute sink operation mode (upsert vs append) is implicitly 
determined based on the destination table type in MaxCompute. The writer 
selects either {{BatchUpsertWriter}} or {{BatchAppendWriter}} using internal 
logic (see {{{}MaxComputeWriter{}}}). This behavior limits flexibility, 
especially when schema evolution is enabled.

When schema evolution is used, tables are automatically created if they do not 
exist. These tables are created as transactional tables by default, which 
forces the pipeline to always use the upsert mode. As a result, users cannot 
explicitly control the sink behavior even if append mode is desired.

*Problem*
 * Sink operation mode cannot be explicitly configured via pipeline YAML.
 * Behavior is tightly coupled with MaxCompute table type.
 * Schema evolution auto-creation enforces transactional tables → always upsert.
 * No way to override this behavior for append use cases.

*Proposed Solution*
Introduce a new configuration option {{sink.operation}} to explicitly control 
the sink mode.

*Example implementation:*
public static final ConfigOption<MaxComputeOptions.SinkOperation> 
SINK_OPERATION =
        ConfigOptions.key("sink.operation")
                .enumType(MaxComputeOptions.SinkOperation.class)
                .defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
                .withDescription(
                        "The sink operation type, support 'upsert' and 
'append', default is 'upsert'.");
This option should be used in {{MaxComputeWriter}} to decide whether to use:
 * {{{}BatchUpsertWriter{}}}, or
 * {{BatchAppendWriter}}

instead of relying on table type inference.

*Example YAML Configuration*
sink:
  type: maxcompute
  name: maxcompute
  access-id: ${secret_values.maxcompute_access_id}
  access-key: ${secret_values.maxcompute_access_key}
  endpoint: ${secret_values.maxcompute_endpoint}
  project: maxcompute_project
  quota.name: res_grp
  sink.operation: append
 

*Expected Behavior*
 * Users can explicitly define sink behavior ({{{}append{}}} or {{{}upsert{}}}) 
via configuration.
 * Default behavior remains {{upsert}} for backward compatibility.
 * Writer selection logic respects the configured option instead of table type.

*Acceptance Criteria*
 * New config {{sink.operation}} is added and documented.
 * {{MaxComputeWriter}} uses this config to select writer implementation.
 * Backward compatibility is preserved (default = upsert).
 * YAML configuration correctly overrides inferred behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to