This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch FLINK-38729-2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 5b763bd0b505a4d055bec18c88c050fb9807fcc2 Author: lvyanquan <[email protected]> AuthorDate: Mon Mar 9 12:06:59 2026 +0800 Address comments. --- .../flink/cdc/composer/flink/translator/DataSinkTranslator.java | 4 +++- .../flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java index c53d9f275..8a6bde5d1 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java @@ -119,7 +119,9 @@ public class DataSinkTranslator { // Get sink provider EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider(); if (eventSinkProvider == null) { - return; + throw new IllegalStateException( + "DataSink#getEventSinkProvider() must not return null. " + + "Please ensure the sink is correctly configured."); } String sinkName = generateSinkName(sinkDef); if (eventSinkProvider instanceof FlinkSinkProvider) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index 6038045dd..79261bc43 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -44,6 +44,9 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -65,6 +68,8 @@ import java.util.Set; public class DataSinkWriterOperator<CommT> extends AbstractStreamOperator<CommittableMessage<CommT>> implements OneInputStreamOperator<Event, CommittableMessage<CommT>>, BoundedOneInput { + private static final Logger LOG = LoggerFactory.getLogger(DataSinkWriterOperator.class); + private SchemaEvolutionClient schemaEvolutionClient; private final OperatorID schemaOperatorID;
