This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch FLINK-38729-2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 5b763bd0b505a4d055bec18c88c050fb9807fcc2
Author: lvyanquan <[email protected]>
AuthorDate: Mon Mar 9 12:06:59 2026 +0800

    Address comments.
---
 .../flink/cdc/composer/flink/translator/DataSinkTranslator.java      | 4 +++-
 .../flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java     | 5 +++++
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
index c53d9f275..8a6bde5d1 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java
@@ -119,7 +119,9 @@ public class DataSinkTranslator {
         // Get sink provider
         EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
         if (eventSinkProvider == null) {
-            return;
+            throw new IllegalStateException(
+                    "DataSink#getEventSinkProvider() must not return null. "
+                            + "Please ensure the sink is correctly 
configured.");
         }
         String sinkName = generateSinkName(sinkDef);
         if (eventSinkProvider instanceof FlinkSinkProvider) {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
index 6038045dd..79261bc43 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java
@@ -44,6 +44,9 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -65,6 +68,8 @@ import java.util.Set;
 public class DataSinkWriterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage<CommT>>
         implements OneInputStreamOperator<Event, CommittableMessage<CommT>>, 
BoundedOneInput {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataSinkWriterOperator.class);
+
     private SchemaEvolutionClient schemaEvolutionClient;
 
     private final OperatorID schemaOperatorID;

Reply via email to