Dion Ricky Saputra created FLINK-39438:
------------------------------------------
Summary: Add configurable sink operation (upsert/append) for
MaxCompute sink in Flink CDC
Key: FLINK-39438
URL: https://issues.apache.org/jira/browse/FLINK-39438
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.5.0
Reporter: Dion Ricky Saputra
*Background*
Currently, the MaxCompute sink operation mode (upsert vs append) is implicitly
determined based on the destination table type in MaxCompute. The writer
selects either {{BatchUpsertWriter}} or {{BatchAppendWriter}} using internal
logic (see {{{}MaxComputeWriter{}}}). This behavior limits flexibility,
especially when schema evolution is enabled.
When schema evolution is used, tables are automatically created if they do not
exist. These tables are created as transactional tables by default, which
forces the pipeline to always use the upsert mode. As a result, users cannot
explicitly control the sink behavior even if append mode is desired.
*Problem*
* Sink operation mode cannot be explicitly configured via pipeline YAML.
* Behavior is tightly coupled with MaxCompute table type.
* Schema evolution auto-creation enforces transactional tables → always upsert.
* No way to override this behavior for append use cases.
*Proposed Solution*
Introduce a new configuration option {{sink.operation}} to explicitly control
the sink mode.
*Example implementation:*
public static final ConfigOption<MaxComputeOptions.SinkOperation>
SINK_OPERATION =
ConfigOptions.key("sink.operation")
.enumType(MaxComputeOptions.SinkOperation.class)
.defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
.withDescription(
"The sink operation type, support 'upsert' and
'append', default is 'upsert'.");
This option should be used in {{MaxComputeWriter}} to decide whether to use:
* {{{}BatchUpsertWriter{}}}, or
* {{BatchAppendWriter}}
instead of relying on table type inference.
*Example YAML Configuration*
sink:
type: maxcompute
name: maxcompute
access-id: ${secret_values.maxcompute_access_id}
access-key: ${secret_values.maxcompute_access_key}
endpoint: ${secret_values.maxcompute_endpoint}
project: maxcompute_project
quota.name: res_grp
sink.operation: append
*Expected Behavior*
* Users can explicitly define sink behavior ({{{}append{}}} or {{{}upsert{}}})
via configuration.
* Default behavior remains {{upsert}} for backward compatibility.
* Writer selection logic respects the configured option instead of table type.
*Acceptance Criteria*
* New config {{sink.operation}} is added and documented.
* {{MaxComputeWriter}} uses this config to select writer implementation.
* Backward compatibility is preserved (default = upsert).
* YAML configuration correctly overrides inferred behavior.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)