This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fd673a2f462 [FLINK-33974] Implement the Sink transformation depending 
on the new SinkV2 interfaces
fd673a2f462 is described below

commit fd673a2f46206ff65978f05fcb96b525696aead2
Author: pvary <[email protected]>
AuthorDate: Fri Jan 19 14:04:53 2024 +0100

    [FLINK-33974] Implement the Sink transformation depending on the new SinkV2 
interfaces
---
 .../operators/sink/CommitterOperatorFactory.java   | 12 ++-
 .../runtime/operators/sink/SinkWriterOperator.java | 23 +++---
 .../sink/StatefulSinkWriterStateHandler.java       | 13 ++--
 .../translators/SinkTransformationTranslator.java  | 85 ++++++++++++++--------
 .../SinkV2TransformationTranslatorITCase.java      |  9 +--
 .../api/graph/StreamGraphGeneratorTest.java        | 23 ++++++
 .../streaming/util/TestExpandingSinkWithMixin.java | 85 ++++++++++++++++++++++
 7 files changed, 186 insertions(+), 64 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
index 1f013ca0580..162f8d04181 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
@@ -19,9 +19,9 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -41,14 +41,12 @@ public final class CommitterOperatorFactory<CommT>
         implements OneInputStreamOperatorFactory<
                 CommittableMessage<CommT>, CommittableMessage<CommT>> {
 
-    private final TwoPhaseCommittingSink<?, CommT> sink;
+    private final SupportsCommitter<CommT> sink;
     private final boolean isBatchMode;
     private final boolean isCheckpointingEnabled;
 
     public CommitterOperatorFactory(
-            TwoPhaseCommittingSink<?, CommT> sink,
-            boolean isBatchMode,
-            boolean isCheckpointingEnabled) {
+            SupportsCommitter<CommT> sink, boolean isBatchMode, boolean 
isCheckpointingEnabled) {
         this.sink = checkNotNull(sink);
         this.isBatchMode = isBatchMode;
         this.isCheckpointingEnabled = isCheckpointingEnabled;
@@ -65,7 +63,7 @@ public final class CommitterOperatorFactory<CommT>
                             processingTimeService,
                             sink.getCommittableSerializer(),
                             context -> sink.createCommitter(context),
-                            sink instanceof WithPostCommitTopology,
+                            sink instanceof SupportsPostCommitTopology,
                             isBatchMode,
                             isCheckpointingEnabled);
             committerOperator.setup(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index c0a9892d5ee..a9d0f682a78 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -113,18 +113,17 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         this.processingTimeService = checkNotNull(processingTimeService);
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
         this.context = new Context<>();
-        this.emitDownstream = sink instanceof TwoPhaseCommittingSink;
+        this.emitDownstream = sink instanceof SupportsCommitter;
 
-        if (sink instanceof StatefulSink) {
+        if (sink instanceof SupportsWriterState) {
             writerStateHandler =
-                    new StatefulSinkWriterStateHandler<>((StatefulSink<InputT, 
?>) sink);
+                    new 
StatefulSinkWriterStateHandler<>((SupportsWriterState<InputT, ?>) sink);
         } else {
             writerStateHandler = new StatelessSinkWriterStateHandler<>(sink);
         }
 
-        if (sink instanceof TwoPhaseCommittingSink) {
-            committableSerializer =
-                    ((TwoPhaseCommittingSink<InputT, CommT>) 
sink).getCommittableSerializer();
+        if (sink instanceof SupportsCommitter) {
+            committableSerializer = ((SupportsCommitter<CommT>) 
sink).getCommittableSerializer();
         } else {
             committableSerializer = null;
         }
@@ -188,13 +187,13 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         if (!emitDownstream) {
             // To support SinkV1 topologies with only a writer we have to call 
prepareCommit
             // although no committables are forwarded
-            if (sinkWriter instanceof PrecommittingSinkWriter) {
-                ((PrecommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
+            if (sinkWriter instanceof CommittingSinkWriter) {
+                ((CommittingSinkWriter<?, ?>) sinkWriter).prepareCommit();
             }
             return;
         }
         Collection<CommT> committables =
-                ((PrecommittingSinkWriter<?, CommT>) 
sinkWriter).prepareCommit();
+                ((CommittingSinkWriter<?, CommT>) sinkWriter).prepareCommit();
         StreamingRuntimeContext runtimeContext = getRuntimeContext();
         final int indexOfThisSubtask = 
runtimeContext.getTaskInfo().getIndexOfThisSubtask();
         final int numberOfParallelSubtasks =
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
index 2e83a3b8a6c..3c70627a50e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
 import org.apache.flink.api.connector.sink2.SupportsWriterState;
 import 
org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
@@ -79,10 +78,12 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
 
     private StatefulSinkWriter<InputT, WriterStateT> sinkWriter;
 
-    public StatefulSinkWriterStateHandler(StatefulSink<InputT, WriterStateT> 
sink) {
-        this.sink = sink;
+    public StatefulSinkWriterStateHandler(SupportsWriterState<InputT, 
WriterStateT> sink) {
+        Preconditions.checkArgument(
+                sink instanceof Sink, "Should be an instance of " + 
Sink.class.getName());
+        this.sink = (Sink<InputT>) sink;
         Collection<String> previousSinkStateNames =
-                sink instanceof StatefulSink.WithCompatibleState
+                sink instanceof SupportsWriterState.WithCompatibleState
                         ? ((WithCompatibleState) 
sink).getCompatibleWriterStateNames()
                         : Collections.emptyList();
         this.writerStateSimpleVersionedSerializer = 
sink.getWriterStateSerializer();
@@ -116,10 +117,6 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
                 Iterables.addAll(states, previousSinkState.get());
             }
 
-            if (!(sink instanceof SupportsWriterState)) {
-                throw new IllegalArgumentException("Sink should implement 
SupportsWriterState");
-            }
-
             sinkWriter = ((SupportsWriterState) 
sink).restoreWriter(initContext, states);
         } else {
             sinkWriter = cast(sink.createWriter(initContext));
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index ce009320659..e924086a1d8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -24,15 +24,15 @@ import 
org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
 import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
-import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
-import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
 import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -44,6 +44,7 @@ import 
org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
 import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -135,16 +136,27 @@ public class SinkTransformationTranslator<Input, Output>
 
             DataStream<T> prewritten = inputStream;
 
-            if (sink instanceof WithPreWriteTopology) {
+            if (sink instanceof SupportsPreWriteTopology) {
                 prewritten =
                         adjustTransformations(
                                 prewritten,
-                                ((WithPreWriteTopology<T>) 
sink)::addPreWriteTopology,
+                                ((SupportsPreWriteTopology<T>) 
sink)::addPreWriteTopology,
                                 true,
                                 sink instanceof 
SupportsConcurrentExecutionAttempts);
             }
 
-            if (sink instanceof TwoPhaseCommittingSink) {
+            if (sink instanceof SupportsPreCommitTopology) {
+                Preconditions.checkArgument(
+                        sink instanceof SupportsCommitter,
+                        "Sink with SupportsPreCommitTopology should implement 
SupportsCommitter");
+            }
+            if (sink instanceof SupportsPostCommitTopology) {
+                Preconditions.checkArgument(
+                        sink instanceof SupportsCommitter,
+                        "Sink with SupportsPostCommitTopology should implement 
SupportsCommitter");
+            }
+
+            if (sink instanceof SupportsCommitter) {
                 addCommittingTopology(sink, prewritten);
             } else {
                 adjustTransformations(
@@ -173,32 +185,27 @@ public class SinkTransformationTranslator<Input, Output>
             }
         }
 
-        private <CommT> void addCommittingTopology(Sink<T> sink, DataStream<T> 
inputStream) {
-            TwoPhaseCommittingSink<T, CommT> committingSink =
-                    (TwoPhaseCommittingSink<T, CommT>) sink;
-            TypeInformation<CommittableMessage<CommT>> typeInformation =
+        private <CommT, WriteResultT> void addCommittingTopology(
+                Sink<T> sink, DataStream<T> inputStream) {
+            SupportsCommitter<CommT> committingSink = 
(SupportsCommitter<CommT>) sink;
+            TypeInformation<CommittableMessage<CommT>> 
committableTypeInformation =
                     
CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer);
 
-            DataStream<CommittableMessage<CommT>> written =
-                    adjustTransformations(
-                            inputStream,
-                            input ->
-                                    input.transform(
-                                            WRITER_NAME,
-                                            typeInformation,
-                                            new 
SinkWriterOperatorFactory<>(sink)),
-                            false,
-                            sink instanceof 
SupportsConcurrentExecutionAttempts);
+            DataStream<CommittableMessage<CommT>> precommitted;
+            if (sink instanceof SupportsPreCommitTopology) {
+                SupportsPreCommitTopology<WriteResultT, CommT> 
preCommittingSink =
+                        (SupportsPreCommitTopology<WriteResultT, CommT>) sink;
+                TypeInformation<CommittableMessage<WriteResultT>> 
writeResultTypeInformation =
+                        
CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);
 
-            DataStream<CommittableMessage<CommT>> precommitted = 
addFailOverRegion(written);
+                DataStream<CommittableMessage<WriteResultT>> writerResult =
+                        addWriter(sink, inputStream, 
writeResultTypeInformation);
 
-            if (sink instanceof WithPreCommitTopology) {
                 precommitted =
                         adjustTransformations(
-                                precommitted,
-                                ((WithPreCommitTopology<T, CommT>) 
sink)::addPreCommitTopology,
-                                true,
-                                false);
+                                writerResult, 
preCommittingSink::addPreCommitTopology, true, false);
+            } else {
+                precommitted = addWriter(sink, inputStream, 
committableTypeInformation);
             }
 
             DataStream<CommittableMessage<CommT>> committed =
@@ -207,7 +214,7 @@ public class SinkTransformationTranslator<Input, Output>
                             pc ->
                                     pc.transform(
                                             COMMITTER_NAME,
-                                            typeInformation,
+                                            committableTypeInformation,
                                             new CommitterOperatorFactory<>(
                                                     committingSink,
                                                     isBatchMode,
@@ -215,12 +222,12 @@ public class SinkTransformationTranslator<Input, Output>
                             false,
                             false);
 
-            if (sink instanceof WithPostCommitTopology) {
+            if (sink instanceof SupportsPostCommitTopology) {
                 DataStream<CommittableMessage<CommT>> postcommitted = 
addFailOverRegion(committed);
                 adjustTransformations(
                         postcommitted,
                         pc -> {
-                            ((WithPostCommitTopology<T, CommT>) 
sink).addPostCommitTopology(pc);
+                            ((SupportsPostCommitTopology<CommT>) 
sink).addPostCommitTopology(pc);
                             return null;
                         },
                         true,
@@ -228,6 +235,24 @@ public class SinkTransformationTranslator<Input, Output>
             }
         }
 
+        private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> 
addWriter(
+                Sink<T> sink,
+                DataStream<T> inputStream,
+                TypeInformation<CommittableMessage<WriteResultT>> 
typeInformation) {
+            DataStream<CommittableMessage<WriteResultT>> written =
+                    adjustTransformations(
+                            inputStream,
+                            input ->
+                                    input.transform(
+                                            WRITER_NAME,
+                                            typeInformation,
+                                            new 
SinkWriterOperatorFactory<>(sink)),
+                            false,
+                            sink instanceof 
SupportsConcurrentExecutionAttempts);
+
+            return addFailOverRegion(written);
+        }
+
         /**
          * Adds a batch exchange that materializes the output first. This is a 
no-op in STREAMING.
          */
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
index 612cb9d7808..97b23ababac 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
@@ -67,10 +67,7 @@ public class SinkV2TransformationTranslatorITCase
                         .setWriterUidHash(writerHash)
                         .setCommitterUidHash(committerHash)
                         .build();
-        src.sinkTo(
-                        
TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build(),
-                        operatorsUidHashes)
-                .name(NAME);
+        src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
 
         final StreamGraph streamGraph = env.getStreamGraph();
 
@@ -87,9 +84,7 @@ public class SinkV2TransformationTranslatorITCase
         final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         final DataStreamSource<Integer> src = env.fromElements(1, 2);
-        
src.sinkTo(TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build())
-                .name(NAME)
-                .uid(sinkUid);
+        src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid);
 
         final StreamGraph streamGraph = env.getStreamGraph();
         assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index f2a47b2a712..c9e56cad319 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -74,6 +74,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import 
org.apache.flink.streaming.runtime.translators.CacheTransformationTranslator;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.TestExpandingSink;
+import org.apache.flink.streaming.util.TestExpandingSinkWithMixin;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
@@ -908,6 +909,28 @@ public class StreamGraphGeneratorTest extends TestLogger {
                         });
     }
 
+    @Test
+    public void testAutoParallelismForExpandedTransformationsWithMixin() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        env.setParallelism(2);
+
+        DataStream<Integer> sourceDataStream = env.fromData(1, 2, 3);
+        // Parallelism is set to -1 (default parallelism identifier) to 
imitate the behavior of
+        // the table planner. Parallelism should be set automatically after 
translating.
+        sourceDataStream.sinkTo(new 
TestExpandingSinkWithMixin()).setParallelism(-1);
+
+        StreamGraph graph = env.getStreamGraph();
+
+        graph.getStreamNodes()
+                .forEach(
+                        node -> {
+                            if (!node.getOperatorName().startsWith("Source")) {
+                                
Assertions.assertThat(node.getParallelism()).isEqualTo(2);
+                            }
+                        });
+    }
+
     @Test
     public void testCacheTransformation() {
         final TestingStreamExecutionEnvironment env = new 
TestingStreamExecutionEnvironment();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java
new file mode 100644
index 00000000000..b3c7ba7bc92
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSinkWithMixin.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+
+import java.io.IOException;
+
+/** A test sink that expands into a simple subgraph. Do not use in runtime. */
+public class TestExpandingSinkWithMixin
+        implements Sink<Integer>,
+                SupportsCommitter<Integer>,
+                SupportsPreWriteTopology<Integer>,
+                SupportsPreCommitTopology<Integer, Integer>,
+                SupportsPostCommitTopology<Integer> {
+
+    @Override
+    public void addPostCommitTopology(DataStream<CommittableMessage<Integer>> 
committables) {
+        committables.sinkTo(new DiscardingSink<>());
+    }
+
+    @Override
+    public DataStream<CommittableMessage<Integer>> addPreCommitTopology(
+            DataStream<CommittableMessage<Integer>> committables) {
+        return committables.map(value -> 
value).returns(committables.getType());
+    }
+
+    @Override
+    public DataStream<Integer> addPreWriteTopology(DataStream<Integer> 
inputDataStream) {
+        return inputDataStream.map(new NoOpIntMap());
+    }
+
+    @Override
+    public SinkWriter<Integer> createWriter(WriterInitContext context) throws 
IOException {
+        return null;
+    }
+
+    @Override
+    public SinkWriter<Integer> createWriter(InitContext context) throws 
IOException {
+        return null;
+    }
+
+    @Override
+    public Committer<Integer> createCommitter(CommitterInitContext context) {
+        return null;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
+        return null;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Integer> getWriteResultSerializer() {
+        return null;
+    }
+}

Reply via email to