This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 4f3c4572a9068d14dc02aa1251156233eab97272 Author: Jark Wu <[email protected]> AuthorDate: Sun Jan 18 00:16:54 2026 +0800 [flink] Fix the distribution mode support for FlussSink for DataStream API --- .../main/java/org/apache/fluss/flink/sink/FlinkSink.java | 16 +++++++++++++++- .../main/java/org/apache/fluss/flink/sink/FlussSink.java | 11 ++++++++++- .../org/apache/fluss/flink/sink/FlussSinkITCase.java | 4 ++-- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java index 439a21678..b9f23bc8e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java @@ -38,9 +38,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; @@ -56,7 +59,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { private static final long serialVersionUID = 1L; - private final SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder; + protected final SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder; private final TablePath tablePath; FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder, TablePath tablePath) { @@ -72,6 +75,17 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { return flinkSinkWriter; } + /** + * {@link FlinkSink} serves as the SQL connector. Since we uses {@link DataStreamSinkProvider} + * (rather than {@link SinkV2Provider}), it does not automatically recognize or invoke the + * {@link SupportsPreWriteTopology} interface. Therefore, the pre-write topology must be added + * manually here. + * + * <p>In contrast, {@link FlussSink} is used directly as a DataStream connector, Flinkās runtime + * explicitly checks for the {@link SupportsPreWriteTopology} interface and automatically + * incorporates the pre-write topology if present. To support this path, the {@link + * SupportsPreWriteTopology} implementation resides in {@link FlussSink}. + */ public DataStreamSink<InputT> apply(DataStream<InputT> input) { return builder.addPreWriteTopology(input) .sinkTo(this) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java index aae36b630..775d4b194 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java @@ -21,6 +21,9 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.flink.sink.writer.FlinkSinkWriter; import org.apache.fluss.metadata.TablePath; +import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; +import org.apache.flink.streaming.api.datastream.DataStream; + /** * FlussSink is a specialized Flink sink for writing data to Fluss. * @@ -32,7 +35,8 @@ import org.apache.fluss.metadata.TablePath; * @since 0.7 */ @PublicEvolving -public class FlussSink<InputT> extends FlinkSink<InputT> { +public class FlussSink<InputT> extends FlinkSink<InputT> + implements SupportsPreWriteTopology<InputT> { private static final long serialVersionUID = 1L; /** @@ -46,6 +50,11 @@ public class FlussSink<InputT> extends FlinkSink<InputT> { super(builder, tablePath); } + @Override + public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) { + return builder.addPreWriteTopology(input); + } + /** * Creates a new {@link FlussSinkBuilder} instance for building a FlussSink. * diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java index b88aa4dae..1a5e98eeb 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java @@ -144,7 +144,7 @@ public class FlussSinkITCase extends FlinkTestBase { .setSerializationSchema(serializationSchema) .build(); - flussSink.apply(stream).name("Fluss Sink"); + stream.sinkTo(flussSink).name("Fluss Sink"); env.executeAsync("Test RowData Fluss Sink"); @@ -272,7 +272,7 @@ public class FlussSinkITCase extends FlinkTestBase { .setSerializationSchema(new TestOrderSerializationSchema()) .build(); - flussSink.apply(stream).name("Fluss Sink"); + stream.sinkTo(flussSink).name("Fluss Sink"); env.executeAsync("Test Order Fluss Sink"); Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));
