[
https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu updated FLINK-35149:
-------------------------------
Affects Version/s: cdc-3.1.0
> Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not
> TwoPhaseCommittingSink
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-35149
> URL: https://issues.apache.org/jira/browse/FLINK-35149
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.0
> Reporter: Hongshun Wang
> Priority: Minor
> Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>
> Current , when sink is not instanceof TwoPhaseCommittingSink, use
> input.transform rather than stream. It means that pre-write topology will be
> ignored.
> {code:java}
> private void sinkTo(
> DataStream<Event> input,
> Sink<Event> sink,
> String sinkName,
> OperatorID schemaOperatorID) {
> DataStream<Event> stream = input;
> // Pre write topology
> if (sink instanceof WithPreWriteTopology) {
> stream = ((WithPreWriteTopology<Event>)
> sink).addPreWriteTopology(stream);
> }
> if (sink instanceof TwoPhaseCommittingSink) {
> addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
> } else {
> input.transform(
> SINK_WRITER_PREFIX + sinkName,
> CommittableMessageTypeInfo.noOutput(),
> new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)