This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 4f3c4572a9068d14dc02aa1251156233eab97272
Author: Jark Wu <[email protected]>
AuthorDate: Sun Jan 18 00:16:54 2026 +0800

    [flink] Fix the distribution mode support for FlussSink for DataStream API
---
 .../main/java/org/apache/fluss/flink/sink/FlinkSink.java | 16 +++++++++++++++-
 .../main/java/org/apache/fluss/flink/sink/FlussSink.java | 11 ++++++++++-
 .../org/apache/fluss/flink/sink/FlussSinkITCase.java     |  4 ++--
 3 files changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
index 439a21678..b9f23bc8e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
@@ -38,9 +38,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
@@ -56,7 +59,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
 
     private static final long serialVersionUID = 1L;
 
-    private final SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder;
+    protected final SinkWriterBuilder<? extends FlinkSinkWriter, InputT> 
builder;
     private final TablePath tablePath;
 
     FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder, 
TablePath tablePath) {
@@ -72,6 +75,17 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
         return flinkSinkWriter;
     }
 
+    /**
+     * {@link FlinkSink} serves as the SQL connector. Since we uses {@link 
DataStreamSinkProvider}
+     * (rather than {@link SinkV2Provider}), it does not automatically 
recognize or invoke the
+     * {@link SupportsPreWriteTopology} interface. Therefore, the pre-write 
topology must be added
+     * manually here.
+     *
+     * <p>In contrast, {@link FlussSink} is used directly as a DataStream 
connector, Flink’s runtime
+     * explicitly checks for the {@link SupportsPreWriteTopology} interface 
and automatically
+     * incorporates the pre-write topology if present. To support this path, 
the {@link
+     * SupportsPreWriteTopology} implementation resides in {@link FlussSink}.
+     */
     public DataStreamSink<InputT> apply(DataStream<InputT> input) {
         return builder.addPreWriteTopology(input)
                 .sinkTo(this)
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
index aae36b630..775d4b194 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
@@ -21,6 +21,9 @@ import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
 import org.apache.fluss.metadata.TablePath;
 
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
 /**
  * FlussSink is a specialized Flink sink for writing data to Fluss.
  *
@@ -32,7 +35,8 @@ import org.apache.fluss.metadata.TablePath;
  * @since 0.7
  */
 @PublicEvolving
-public class FlussSink<InputT> extends FlinkSink<InputT> {
+public class FlussSink<InputT> extends FlinkSink<InputT>
+        implements SupportsPreWriteTopology<InputT> {
     private static final long serialVersionUID = 1L;
 
     /**
@@ -46,6 +50,11 @@ public class FlussSink<InputT> extends FlinkSink<InputT> {
         super(builder, tablePath);
     }
 
+    @Override
+    public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
+        return builder.addPreWriteTopology(input);
+    }
+
     /**
      * Creates a new {@link FlussSinkBuilder} instance for building a 
FlussSink.
      *
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
index b88aa4dae..1a5e98eeb 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
@@ -144,7 +144,7 @@ public class FlussSinkITCase extends FlinkTestBase {
                         .setSerializationSchema(serializationSchema)
                         .build();
 
-        flussSink.apply(stream).name("Fluss Sink");
+        stream.sinkTo(flussSink).name("Fluss Sink");
 
         env.executeAsync("Test RowData Fluss Sink");
 
@@ -272,7 +272,7 @@ public class FlussSinkITCase extends FlinkTestBase {
                         .setSerializationSchema(new 
TestOrderSerializationSchema())
                         .build();
 
-        flussSink.apply(stream).name("Fluss Sink");
+        stream.sinkTo(flussSink).name("Fluss Sink");
         env.executeAsync("Test Order Fluss Sink");
 
         Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));

Reply via email to