This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 92951a05127 [FLINK-33295] Separate SinkV2 and SinkV1Adapter tests
92951a05127 is described below

commit 92951a05127f1e0e2ab0ea04ae022659fc5276ab
Author: pvary <[email protected]>
AuthorDate: Wed Nov 8 17:55:43 2023 +0100

    [FLINK-33295] Separate SinkV2 and SinkV1Adapter tests
    
    Co-authored-by: Peter Vary <[email protected]>
---
 .../base/sink/writer/TestSinkInitContext.java      |   4 +-
 .../connector/file/sink/writer/FileWriterTest.java |   8 +-
 .../groups/InternalSinkWriterMetricGroup.java      |  16 +-
 .../metrics/groups/MetricsGroupTestUtils.java      |  47 +++
 .../api/datastream/DataStreamSinkTest.java         |   8 +-
 .../streaming/api/functions/PrintSinkTest.java     |   5 +-
 .../SinkTransformationTranslatorITCaseBase.java    | 225 +++++++++++
 ...a => SinkV1TransformationTranslatorITCase.java} | 190 +--------
 .../SinkV2TransformationTranslatorITCase.java      | 100 +++++
 ...torTest.java => CommitterOperatorTestBase.java} | 122 ++----
 .../runtime/operators/sink/SinkTestUtil.java       |   4 +-
 .../sink/SinkV2CommitterOperatorTest.java          |  75 ++++
 .../sink/SinkV2SinkWriterOperatorTest.java         | 149 +++++++
 ...orTest.java => SinkWriterOperatorTestBase.java} | 212 ++++------
 .../streaming/runtime/operators/sink/TestSink.java |  50 +--
 .../runtime/operators/sink/TestSinkV2.java         | 434 +++++++++++++++++++++
 .../sink/WithAdapterCommitterOperatorTest.java     |  83 ++++
 .../sink/WithAdapterSinkWriterOperatorTest.java    | 132 +++++++
 .../flink/test/streaming/runtime/SinkV2ITCase.java | 138 +++++++
 .../streaming/runtime/SinkV2MetricsITCase.java     | 183 +++++++++
 20 files changed, 1725 insertions(+), 460 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index 601d4f9d427..1f70b03413c 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -28,7 +28,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -51,7 +51,7 @@ public class TestSinkInitContext implements Sink.InitContext {
     private final OperatorIOMetricGroup operatorIOMetricGroup =
             
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
     private final SinkWriterMetricGroup metricGroup =
-            InternalSinkWriterMetricGroup.mock(
+            MetricsGroupTestUtils.mockWriterMetricGroup(
                     metricListener.getMetricGroup(), operatorIOMetricGroup);
     private final MailboxExecutor mailboxExecutor;
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index 0149bf3e6da..cd0dda1d978 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
@@ -292,7 +292,7 @@ class FileWriterTest {
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
         final SinkWriterMetricGroup sinkWriterMetricGroup =
-                InternalSinkWriterMetricGroup.mock(
+                MetricsGroupTestUtils.mockWriterMetricGroup(
                         metricListener.getMetricGroup(), 
operatorIOMetricGroup);
 
         Counter recordsCounter = 
sinkWriterMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
@@ -471,7 +471,7 @@ class FileWriterTest {
                 basePath,
                 rollingPolicy,
                 outputFileConfig,
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
+                
MetricsGroupTestUtils.mockWriterMetricGroup(metricListener.getMetricGroup()));
     }
 
     private FileWriter<String> createWriter(
@@ -484,7 +484,7 @@ class FileWriterTest {
             throws IOException {
         return new FileWriter<>(
                 basePath,
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                
MetricsGroupTestUtils.mockWriterMetricGroup(metricListener.getMetricGroup()),
                 bucketAssigner,
                 new DefaultFileWriterBucketFactory<>(),
                 new RowWiseBucketWriter<>(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
index 81aa8d78ce3..27d0c72ed5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java
@@ -26,7 +26,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.metrics.MetricNames;
 
 /** Special {@link org.apache.flink.metrics.MetricGroup} representing an 
Operator. */
@@ -40,7 +39,8 @@ public class InternalSinkWriterMetricGroup extends 
ProxyMetricGroup<MetricGroup>
     private final Counter numBytesWritten;
     private final OperatorIOMetricGroup operatorIOMetricGroup;
 
-    private InternalSinkWriterMetricGroup(
+    @VisibleForTesting
+    InternalSinkWriterMetricGroup(
             MetricGroup parentMetricGroup, OperatorIOMetricGroup 
operatorIOMetricGroup) {
         super(parentMetricGroup);
         numRecordsOutErrors = 
parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
@@ -61,18 +61,6 @@ public class InternalSinkWriterMetricGroup extends 
ProxyMetricGroup<MetricGroup>
                 operatorMetricGroup, operatorMetricGroup.getIOMetricGroup());
     }
 
-    @VisibleForTesting
-    public static InternalSinkWriterMetricGroup mock(MetricGroup metricGroup) {
-        return new InternalSinkWriterMetricGroup(
-                metricGroup, 
UnregisteredMetricsGroup.createOperatorIOMetricGroup());
-    }
-
-    @VisibleForTesting
-    public static InternalSinkWriterMetricGroup mock(
-            MetricGroup metricGroup, OperatorIOMetricGroup 
operatorIOMetricGroup) {
-        return new InternalSinkWriterMetricGroup(metricGroup, 
operatorIOMetricGroup);
-    }
-
     @Override
     public OperatorIOMetricGroup getIOMetricGroup() {
         return operatorIOMetricGroup;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricsGroupTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricsGroupTestUtils.java
new file mode 100644
index 00000000000..fae09a39e9b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricsGroupTestUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+/** Util class to create metric groups for SinkV2 tests. */
+public class MetricsGroupTestUtils {
+
+    @VisibleForTesting
+    public static InternalSinkWriterMetricGroup mockWriterMetricGroup() {
+        return new InternalSinkWriterMetricGroup(
+                new UnregisteredMetricsGroup(),
+                UnregisteredMetricsGroup.createOperatorIOMetricGroup());
+    }
+
+    @VisibleForTesting
+    public static InternalSinkWriterMetricGroup 
mockWriterMetricGroup(MetricGroup metricGroup) {
+        return new InternalSinkWriterMetricGroup(
+                metricGroup, 
UnregisteredMetricsGroup.createOperatorIOMetricGroup());
+    }
+
+    @VisibleForTesting
+    public static InternalSinkWriterMetricGroup mockWriterMetricGroup(
+            MetricGroup metricGroup, OperatorIOMetricGroup 
operatorIOMetricGroup) {
+        return new InternalSinkWriterMetricGroup(metricGroup, 
operatorIOMetricGroup);
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
index fb2e9d4cceb..401ed55215c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
 
 import org.junit.Test;
 
@@ -33,13 +33,15 @@ public class DataStreamSinkTest {
     public void testGettingTransformationWithNewSinkAPI() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         final Transformation<?> transformation =
-                env.fromElements(1, 
2).sinkTo(TestSink.newBuilder().build()).getTransformation();
+                env.fromElements(1, 2)
+                        .sinkTo(TestSinkV2.<Integer>newBuilder().build())
+                        .getTransformation();
         assertTrue(transformation instanceof SinkTransformation);
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void throwExceptionWhenSetUidWithNewSinkAPI() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.fromElements(1, 
2).sinkTo(TestSink.newBuilder().build()).setUidHash("Test");
+        env.fromElements(1, 
2).sinkTo(TestSinkV2.<Integer>newBuilder().build()).setUidHash("Test");
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
index e8760d6d5de..352118db5d4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
@@ -26,8 +26,7 @@ import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
 import org.apache.flink.streaming.api.functions.sink.PrintSink;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -200,7 +199,7 @@ class PrintSinkTest {
 
         @Override
         public SinkWriterMetricGroup metricGroup() {
-            return InternalSinkWriterMetricGroup.mock(new 
UnregisteredMetricsGroup());
+            return MetricsGroupTestUtils.mockWriterMetricGroup();
         }
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
new file mode 100644
index 00000000000..3c93b178b51
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
+import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for {@link 
org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or 
re-order test cases.
+ */
+@RunWith(Parameterized.class)
+public abstract class SinkTransformationTranslatorITCaseBase<SinkT> extends 
TestLogger {
+
+    @Parameterized.Parameters(name = "Execution Mode: {0}")
+    public static Collection<Object> data() {
+        return Arrays.asList(RuntimeExecutionMode.STREAMING, 
RuntimeExecutionMode.BATCH);
+    }
+
+    @Parameterized.Parameter() public RuntimeExecutionMode 
runtimeExecutionMode;
+
+    static final String NAME = "FileSink";
+    static final String SLOT_SHARE_GROUP = "FileGroup";
+    static final String UID = "FileUid";
+    static final int PARALLELISM = 2;
+
+    abstract SinkT simpleSink();
+
+    abstract SinkT sinkWithCommitter();
+
+    abstract DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, SinkT 
sink);
+
+    @Test
+    public void generateWriterTopology() {
+        final StreamGraph streamGraph = buildGraph(simpleSink(), 
runtimeExecutionMode);
+
+        final StreamNode sourceNode = findNodeName(streamGraph, node -> 
node.contains("Source"));
+        final StreamNode writerNode = findWriter(streamGraph);
+
+        assertThat(streamGraph.getStreamNodes().size(), equalTo(2));
+
+        validateTopology(
+                sourceNode,
+                IntSerializer.class,
+                writerNode,
+                SinkWriterOperatorFactory.class,
+                PARALLELISM,
+                -1);
+    }
+
+    @Test
+    public void generateWriterCommitterTopology() {
+
+        final StreamGraph streamGraph = buildGraph(sinkWithCommitter(), 
runtimeExecutionMode);
+
+        final StreamNode sourceNode = findNodeName(streamGraph, node -> 
node.contains("Source"));
+        final StreamNode writerNode = findWriter(streamGraph);
+
+        validateTopology(
+                sourceNode,
+                IntSerializer.class,
+                writerNode,
+                SinkWriterOperatorFactory.class,
+                PARALLELISM,
+                -1);
+
+        final StreamNode committerNode =
+                findNodeName(streamGraph, name -> name.contains("Committer"));
+
+        assertThat(streamGraph.getStreamNodes().size(), equalTo(3));
+
+        validateTopology(
+                writerNode,
+                SimpleVersionedSerializerTypeSerializerProxy.class,
+                committerNode,
+                CommitterOperatorFactory.class,
+                PARALLELISM,
+                -1);
+    }
+
+    StreamNode findWriter(StreamGraph streamGraph) {
+        return findNodeName(
+                streamGraph, name -> name.contains("Writer") && 
!name.contains("Committer"));
+    }
+
+    StreamNode findCommitter(StreamGraph streamGraph) {
+        return findNodeName(
+                streamGraph,
+                name -> name.contains("Committer") && !name.contains("Global 
Committer"));
+    }
+
+    StreamNode findGlobalCommitter(StreamGraph streamGraph) {
+        return findNodeName(streamGraph, name -> name.contains("Global 
Committer"));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void throwExceptionWithoutSettingUid() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        final Configuration config = new Configuration();
+        config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
+        env.configure(config, getClass().getClassLoader());
+        // disable auto generating uid
+        env.getConfig().disableAutoGeneratedUIDs();
+        sinkTo(env.fromElements(1, 2), simpleSink());
+        env.getStreamGraph();
+    }
+
+    @Test
+    public void disableOperatorChain() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        final DataStreamSink<Integer> dataStreamSink = sinkTo(src, 
sinkWithCommitter()).name(NAME);
+        dataStreamSink.disableChaining();
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        final StreamNode writer = findWriter(streamGraph);
+        final StreamNode committer = findCommitter(streamGraph);
+
+        assertThat(writer.getOperatorFactory().getChainingStrategy(), 
is(ChainingStrategy.NEVER));
+        assertThat(
+                committer.getOperatorFactory().getChainingStrategy(), 
is(ChainingStrategy.NEVER));
+    }
+
+    void validateTopology(
+            StreamNode src,
+            Class<?> srcOutTypeInfo,
+            StreamNode dest,
+            Class<? extends StreamOperatorFactory> operatorFactoryClass,
+            int expectedParallelism,
+            int expectedMaxParallelism) {
+
+        // verify src node
+        final StreamEdge srcOutEdge = src.getOutEdges().get(0);
+        assertThat(srcOutEdge.getTargetId(), equalTo(dest.getId()));
+        assertThat(src.getTypeSerializerOut(), instanceOf(srcOutTypeInfo));
+
+        // verify dest node input
+        final StreamEdge destInputEdge = dest.getInEdges().get(0);
+        assertThat(destInputEdge.getSourceId(), equalTo(src.getId()));
+        assertThat(dest.getTypeSerializersIn()[0], instanceOf(srcOutTypeInfo));
+
+        // make sure 2 sink operators have different names/uid
+        assertThat(dest.getOperatorName(), 
not(equalTo(src.getOperatorName())));
+        assertThat(dest.getTransformationUID(), 
not(equalTo(src.getTransformationUID())));
+
+        assertThat(dest.getOperatorFactory(), 
instanceOf(operatorFactoryClass));
+        assertThat(dest.getParallelism(), equalTo(expectedParallelism));
+        assertThat(dest.getMaxParallelism(), equalTo(expectedMaxParallelism));
+        assertThat(dest.getOperatorFactory().getChainingStrategy(), 
is(ChainingStrategy.ALWAYS));
+        assertThat(dest.getSlotSharingGroup(), equalTo(SLOT_SHARE_GROUP));
+    }
+
+    StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode 
runtimeExecutionMode) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        final Configuration config = new Configuration();
+        config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
+        env.configure(config, getClass().getClassLoader());
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        final DataStreamSink<Integer> dataStreamSink = sinkTo(src.rebalance(), 
sink);
+        setSinkProperty(dataStreamSink);
+        // Trigger the plan generation but do not clear the transformations
+        env.getExecutionPlan();
+        return env.getStreamGraph();
+    }
+
+    private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) {
+        dataStreamSink.name(NAME);
+        dataStreamSink.uid(UID);
+        
dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM);
+        dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
+    }
+
+    StreamNode findNodeName(StreamGraph streamGraph, Predicate<String> 
predicate) {
+        return streamGraph.getStreamNodes().stream()
+                .filter(node -> predicate.test(node.getOperatorName()))
+                .findFirst()
+                .orElseThrow(() -> new IllegalStateException("Can not find the 
node"));
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
similarity index 52%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
index 4d0b2323c78..1b5bd22142f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
@@ -20,33 +20,23 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
 import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
 import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 import org.apache.flink.streaming.runtime.operators.sink.TestSink;
-import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.function.Predicate;
-
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
@@ -56,69 +46,22 @@ import static org.junit.Assert.assertEquals;
  * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or 
re-order test cases.
  */
 @RunWith(Parameterized.class)
-public class SinkTransformationTranslatorITCase extends TestLogger {
+public class SinkV1TransformationTranslatorITCase
+        extends SinkTransformationTranslatorITCaseBase<Sink<Integer, ?, ?, ?>> 
{
 
-    @Parameterized.Parameters(name = "Execution Mode: {0}")
-    public static Collection<Object> data() {
-        return Arrays.asList(RuntimeExecutionMode.STREAMING, 
RuntimeExecutionMode.BATCH);
+    @Override
+    Sink<Integer, ?, ?, ?> simpleSink() {
+        return TestSink.newBuilder().build();
     }
 
-    @Parameterized.Parameter() public RuntimeExecutionMode 
runtimeExecutionMode;
-
-    static final String NAME = "FileSink";
-    static final String SLOT_SHARE_GROUP = "FileGroup";
-    static final String UID = "FileUid";
-    static final int PARALLELISM = 2;
-
-    @Test
-    public void generateWriterTopology() {
-        final StreamGraph streamGraph =
-                buildGraph(TestSink.newBuilder().build(), 
runtimeExecutionMode);
-
-        final StreamNode sourceNode = findNodeName(streamGraph, node -> 
node.contains("Source"));
-        final StreamNode writerNode = findWriter(streamGraph);
-
-        assertThat(streamGraph.getStreamNodes().size(), equalTo(2));
-
-        validateTopology(
-                sourceNode,
-                IntSerializer.class,
-                writerNode,
-                SinkWriterOperatorFactory.class,
-                PARALLELISM,
-                -1);
+    @Override
+    Sink<Integer, ?, ?, ?> sinkWithCommitter() {
+        return TestSink.newBuilder().setDefaultCommitter().build();
     }
 
-    @Test
-    public void generateWriterCommitterTopology() {
-
-        final StreamGraph streamGraph =
-                buildGraph(
-                        TestSink.newBuilder().setDefaultCommitter().build(), 
runtimeExecutionMode);
-
-        final StreamNode sourceNode = findNodeName(streamGraph, node -> 
node.contains("Source"));
-        final StreamNode writerNode = findWriter(streamGraph);
-
-        validateTopology(
-                sourceNode,
-                IntSerializer.class,
-                writerNode,
-                SinkWriterOperatorFactory.class,
-                PARALLELISM,
-                -1);
-
-        final StreamNode committerNode =
-                findNodeName(streamGraph, name -> name.contains("Committer"));
-
-        assertThat(streamGraph.getStreamNodes().size(), equalTo(3));
-
-        validateTopology(
-                writerNode,
-                SimpleVersionedSerializerTypeSerializerProxy.class,
-                committerNode,
-                CommitterOperatorFactory.class,
-                PARALLELISM,
-                -1);
+    @Override
+    DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer, 
?, ?, ?> sink) {
+        return stream.sinkTo(sink);
     }
 
     @Test
@@ -219,56 +162,6 @@ public class SinkTransformationTranslatorITCase extends 
TestLogger {
                 1);
     }
 
-    private StreamNode findWriter(StreamGraph streamGraph) {
-        return findNodeName(
-                streamGraph, name -> name.contains("Writer") && 
!name.contains("Committer"));
-    }
-
-    private StreamNode findCommitter(StreamGraph streamGraph) {
-        return findNodeName(streamGraph, name -> name.contains("Committer"));
-    }
-
-    private StreamNode findGlobalCommitter(StreamGraph streamGraph) {
-        return findNodeName(streamGraph, name -> name.contains("Global 
Committer"));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void throwExceptionWithoutSettingUid() {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        final Configuration config = new Configuration();
-        config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
-        env.configure(config, getClass().getClassLoader());
-        // disable auto generating uid
-        env.getConfig().disableAutoGeneratedUIDs();
-        env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build());
-        env.getStreamGraph();
-    }
-
-    @Test
-    public void disableOperatorChain() {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        final DataStreamSource<Integer> src = env.fromElements(1, 2);
-        final DataStreamSink<Integer> dataStreamSink =
-                src.sinkTo(
-                                TestSink.newBuilder()
-                                        .setDefaultCommitter()
-                                        .setDefaultGlobalCommitter()
-                                        .build())
-                        .name(NAME);
-        dataStreamSink.disableChaining();
-
-        final StreamGraph streamGraph = env.getStreamGraph();
-        final StreamNode writer = findWriter(streamGraph);
-        final StreamNode globalCommitter = findCommitter(streamGraph);
-
-        assertThat(writer.getOperatorFactory().getChainingStrategy(), 
is(ChainingStrategy.NEVER));
-        assertThat(
-                globalCommitter.getOperatorFactory().getChainingStrategy(),
-                is(ChainingStrategy.NEVER));
-    }
-
     @Test
     public void testSettingOperatorUidHash() {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -319,61 +212,4 @@ public class SinkTransformationTranslatorITCase extends 
TestLogger {
                 findGlobalCommitter(streamGraph).getTransformationUID(),
                 String.format("Sink %s Global Committer", sinkUid));
     }
-
-    private void validateTopology(
-            StreamNode src,
-            Class<?> srcOutTypeInfo,
-            StreamNode dest,
-            Class<? extends StreamOperatorFactory> operatorFactoryClass,
-            int expectedParallelism,
-            int expectedMaxParallelism) {
-
-        // verify src node
-        final StreamEdge srcOutEdge = src.getOutEdges().get(0);
-        assertThat(srcOutEdge.getTargetId(), equalTo(dest.getId()));
-        assertThat(src.getTypeSerializerOut(), instanceOf(srcOutTypeInfo));
-
-        // verify dest node input
-        final StreamEdge destInputEdge = dest.getInEdges().get(0);
-        assertThat(destInputEdge.getSourceId(), equalTo(src.getId()));
-        assertThat(dest.getTypeSerializersIn()[0], instanceOf(srcOutTypeInfo));
-
-        // make sure 2 sink operators have different names/uid
-        assertThat(dest.getOperatorName(), 
not(equalTo(src.getOperatorName())));
-        assertThat(dest.getTransformationUID(), 
not(equalTo(src.getTransformationUID())));
-
-        assertThat(dest.getOperatorFactory(), 
instanceOf(operatorFactoryClass));
-        assertThat(dest.getParallelism(), equalTo(expectedParallelism));
-        assertThat(dest.getMaxParallelism(), equalTo(expectedMaxParallelism));
-        assertThat(dest.getOperatorFactory().getChainingStrategy(), 
is(ChainingStrategy.ALWAYS));
-        assertThat(dest.getSlotSharingGroup(), equalTo(SLOT_SHARE_GROUP));
-    }
-
-    private StreamGraph buildGraph(TestSink sink, RuntimeExecutionMode 
runtimeExecutionMode) {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        final Configuration config = new Configuration();
-        config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode);
-        env.configure(config, getClass().getClassLoader());
-        final DataStreamSource<Integer> src = env.fromElements(1, 2);
-        final DataStreamSink<Integer> dataStreamSink = 
src.rebalance().sinkTo(sink);
-        setSinkProperty(dataStreamSink);
-        // Trigger the plan generation but do not clear the transformations
-        env.getExecutionPlan();
-        return env.getStreamGraph();
-    }
-
-    private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) {
-        dataStreamSink.name(NAME);
-        dataStreamSink.uid(UID);
-        
dataStreamSink.setParallelism(SinkTransformationTranslatorITCase.PARALLELISM);
-        dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
-    }
-
-    private StreamNode findNodeName(StreamGraph streamGraph, Predicate<String> 
predicate) {
-        return streamGraph.getStreamNodes().stream()
-                .filter(node -> predicate.test(node.getOperatorName()))
-                .findFirst()
-                .orElseThrow(() -> new IllegalStateException("Can not find the 
node"));
-    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
new file mode 100644
index 00000000000..612cb9d7808
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link 
org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or 
re-order test cases.
+ */
+@RunWith(Parameterized.class)
+public class SinkV2TransformationTranslatorITCase
+        extends SinkTransformationTranslatorITCaseBase<Sink<Integer>> {
+
+    @Override
+    Sink<Integer> simpleSink() {
+        return TestSinkV2.<Integer>newBuilder().build();
+    }
+
+    @Override
+    Sink<Integer> sinkWithCommitter() {
+        return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build();
+    }
+
+    @Override
+    DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer> 
sink) {
+        return stream.sinkTo(sink);
+    }
+
+    @Test
+    public void testSettingOperatorUidHash() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
+        final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
+        final CustomSinkOperatorUidHashes operatorsUidHashes =
+                CustomSinkOperatorUidHashes.builder()
+                        .setWriterUidHash(writerHash)
+                        .setCommitterUidHash(committerHash)
+                        .build();
+        src.sinkTo(
+                        
TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build(),
+                        operatorsUidHashes)
+                .name(NAME);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+
+        assertEquals(findWriter(streamGraph).getUserHash(), writerHash);
+        assertEquals(findCommitter(streamGraph).getUserHash(), committerHash);
+    }
+
+    /**
+     * When ever you need to change something in this test case please think 
about possible state
+     * upgrade problems introduced by your changes.
+     */
+    @Test
+    public void testSettingOperatorUids() {
+        final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        
src.sinkTo(TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build())
+                .name(NAME)
+                .uid(sinkUid);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
+        assertEquals(
+                findCommitter(streamGraph).getTransformationUID(),
+                String.format("Sink Committer: %s", sinkUid));
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
similarity index 77%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index 7ae860c54e0..fe3625ec8ce 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -34,45 +32,31 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.function.IntSupplier;
 
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableSummary;
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableWithLinage;
 import static org.assertj.core.api.Assertions.assertThat;
 
-class CommitterOperatorTest {
+abstract class CommitterOperatorTestBase {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testEmitCommittables(boolean withPostCommitTopology) throws Exception 
{
-        final ForwardingCommitter committer = new ForwardingCommitter();
-
-        Sink<Integer> sink;
+        SinkAndCounters sinkAndCounters;
         if (withPostCommitTopology) {
             // Insert global committer to simulate post commit topology
-            sink =
-                    TestSink.newBuilder()
-                            .setCommitter(committer)
-                            .setDefaultGlobalCommitter()
-                            
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
-                            .build()
-                            .asV2();
+            sinkAndCounters = sinkWithPostCommit();
         } else {
-            sink =
-                    TestSink.newBuilder()
-                            .setCommitter(committer)
-                            
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
-                            .build()
-                            .asV2();
+            sinkAndCounters = sinkWithoutPostCommit();
         }
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
                 testHarness =
                         new OneInputStreamOperatorTestHarness<>(
-                                new CommitterOperatorFactory<>(
-                                        (TwoPhaseCommittingSink<?, String>) 
sink, false, true));
+                                new 
CommitterOperatorFactory<>(sinkAndCounters.sink, false, true));
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -85,7 +69,7 @@ class CommitterOperatorTest {
         // Trigger commit
         testHarness.notifyOfCompletedCheckpoint(1);
 
-        assertThat(committer.getSuccessfulCommits()).isEqualTo(1);
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
         if (withPostCommitTopology) {
             final List<StreamElement> output = 
fromOutput(testHarness.getOutput());
             SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
@@ -102,10 +86,10 @@ class CommitterOperatorTest {
 
     @Test
     void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws 
Exception {
-        final ForwardingCommitter committer = new ForwardingCommitter();
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer, false, true);
+                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
         testHarness.open();
         testHarness.setProcessingTime(0);
 
@@ -119,7 +103,7 @@ class CommitterOperatorTest {
         testHarness.notifyOfCompletedCheckpoint(1);
 
         assertThat(testHarness.getOutput()).isEmpty();
-        assertThat(committer.getSuccessfulCommits()).isEqualTo(0);
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isZero();
 
         final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("2", 1L, 1);
         testHarness.processElement(new StreamRecord<>(second));
@@ -129,7 +113,7 @@ class CommitterOperatorTest {
 
         final List<StreamElement> output = fromOutput(testHarness.getOutput());
         assertThat(output).hasSize(3);
-        assertThat(committer.getSuccessfulCommits()).isEqualTo(2);
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
         SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
                 
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
                 
.hasOverallCommittables(committableSummary.getNumberOfCommittables())
@@ -143,10 +127,11 @@ class CommitterOperatorTest {
 
     @Test
     void testImmediatelyCommitLateCommittables() throws Exception {
-        final ForwardingCommitter committer = new ForwardingCommitter();
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
+
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer, false, true);
+                testHarness = createTestHarness(sinkAndCounters.sink, false, 
true);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -166,7 +151,7 @@ class CommitterOperatorTest {
 
         final List<StreamElement> output = fromOutput(testHarness.getOutput());
         assertThat(output).hasSize(2);
-        assertThat(committer.getSuccessfulCommits()).isEqualTo(1);
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
         SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
                 
.hasFailedCommittables(committableSummary.getNumberOfFailedCommittables())
                 
.hasOverallCommittables(committableSummary.getNumberOfCommittables())
@@ -179,10 +164,10 @@ class CommitterOperatorTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
-        final ForwardingCommitter committer = new ForwardingCommitter();
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer, isBatchMode, 
!isBatchMode);
+                testHarness = createTestHarness(sinkAndCounters.sink, 
isBatchMode, !isBatchMode);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -199,7 +184,7 @@ class CommitterOperatorTest {
 
         testHarness.endInput();
         if (!isBatchMode) {
-            assertThat(testHarness.getOutput()).hasSize(0);
+            assertThat(testHarness.getOutput()).isEmpty();
             // notify final checkpoint complete
             testHarness.notifyOfCompletedCheckpoint(1);
         }
@@ -227,7 +212,7 @@ class CommitterOperatorTest {
                         CommittableMessage<String>, CommittableMessage<String>>
                 testHarness =
                         createTestHarness(
-                                new TestSink.RetryOnceCommitter(),
+                                sinkWithPostCommitWithRetry().sink,
                                 false,
                                 true,
                                 1,
@@ -262,15 +247,15 @@ class CommitterOperatorTest {
         assertThat(testHarness.getOutput()).isEmpty();
         testHarness.close();
 
-        final ForwardingCommitter committer = new ForwardingCommitter();
-
         // create new testHarness but with different parallelism level and 
subtaskId that original
         // one.
         // we will make sure that new subtaskId was used during committable 
recovery.
+        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
                 restored =
-                        createTestHarness(committer, false, true, 10, 10, 
subtaskIdAfterRecovery);
+                        createTestHarness(
+                                sinkAndCounters.sink, false, true, 10, 10, 
subtaskIdAfterRecovery);
 
         restored.initializeState(snapshot);
         restored.open();
@@ -278,7 +263,7 @@ class CommitterOperatorTest {
         // Previous committables are immediately committed if possible
         final List<StreamElement> output = fromOutput(restored.getOutput());
         assertThat(output).hasSize(3);
-        assertThat(committer.getSuccessfulCommits()).isEqualTo(2);
+        assertThat(sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
         SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
                 .hasCheckpointId(checkpointId)
                 .hasFailedCommittables(0)
@@ -300,22 +285,14 @@ class CommitterOperatorTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        final Sink<Integer> sink =
-                TestSink.newBuilder()
-                        .setDefaultCommitter()
-                        .setDefaultGlobalCommitter()
-                        
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
-                        .build()
-                        .asV2();
+        final SinkAndCounters sinkAndCounters = sinkWithPostCommit();
 
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
                 testHarness =
                         new OneInputStreamOperatorTestHarness<>(
                                 new CommitterOperatorFactory<>(
-                                        (TwoPhaseCommittingSink<?, String>) 
sink,
-                                        false,
-                                        isCheckpointingEnabled));
+                                        sinkAndCounters.sink, false, 
isCheckpointingEnabled));
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -363,28 +340,18 @@ class CommitterOperatorTest {
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
             createTestHarness(
-                    Committer<String> committer,
+                    TwoPhaseCommittingSink<?, String> sink,
                     boolean isBatchMode,
                     boolean isCheckpointingEnabled)
                     throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(
-                        (TwoPhaseCommittingSink<?, String>)
-                                TestSink.newBuilder()
-                                        .setCommitter(committer)
-                                        .setDefaultGlobalCommitter()
-                                        .setCommittableSerializer(
-                                                
TestSink.StringCommittableSerializer.INSTANCE)
-                                        .build()
-                                        .asV2(),
-                        isBatchMode,
-                        isCheckpointingEnabled));
+                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled));
     }
 
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
             createTestHarness(
-                    Committer<String> committer,
+                    TwoPhaseCommittingSink<?, String> sink,
                     boolean isBatchMode,
                     boolean isCheckpointingEnabled,
                     int maxParallelism,
@@ -392,36 +359,25 @@ class CommitterOperatorTest {
                     int subtaskId)
                     throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
-                new CommitterOperatorFactory<>(
-                        (TwoPhaseCommittingSink<?, String>)
-                                TestSink.newBuilder()
-                                        .setCommitter(committer)
-                                        .setDefaultGlobalCommitter()
-                                        .setCommittableSerializer(
-                                                
TestSink.StringCommittableSerializer.INSTANCE)
-                                        .build()
-                                        .asV2(),
-                        isBatchMode,
-                        isCheckpointingEnabled),
+                new CommitterOperatorFactory<>(sink, isBatchMode, 
isCheckpointingEnabled),
                 maxParallelism,
                 parallelism,
                 subtaskId);
     }
 
-    private static class ForwardingCommitter extends TestSink.DefaultCommitter 
{
-        private int successfulCommits = 0;
+    abstract SinkAndCounters sinkWithPostCommit();
 
-        @Override
-        public List<String> commit(List<String> committables) {
-            successfulCommits += committables.size();
-            return Collections.emptyList();
-        }
+    abstract SinkAndCounters sinkWithPostCommitWithRetry();
+
+    abstract SinkAndCounters sinkWithoutPostCommit();
 
-        @Override
-        public void close() throws Exception {}
+    static class SinkAndCounters {
+        TwoPhaseCommittingSink<?, String> sink;
+        IntSupplier commitCounter;
 
-        public int getSuccessfulCommits() {
-            return successfulCommits;
+        public SinkAndCounters(TwoPhaseCommittingSink<?, String> sink, 
IntSupplier commitCounter) {
+            this.sink = sink;
+            this.commitCounter = commitCounter;
         }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
index c309835cc75..b457b5a0b78 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
@@ -52,7 +52,7 @@ class SinkTestUtil {
     static byte[] toBytes(String obj) {
         try {
             return SimpleVersionedSerialization.writeVersionAndSerialize(
-                    TestSink.StringCommittableSerializer.INSTANCE, obj);
+                    TestSinkV2.StringSerializer.INSTANCE, obj);
         } catch (IOException e) {
             throw new IllegalStateException(e);
         }
@@ -83,7 +83,7 @@ class SinkTestUtil {
     static String fromBytes(byte[] obj) {
         try {
             return SimpleVersionedSerialization.readVersionAndDeSerialize(
-                    TestSink.StringCommittableSerializer.INSTANCE, obj);
+                    TestSinkV2.StringSerializer.INSTANCE, obj);
         } catch (IOException e) {
             throw new IllegalStateException(e);
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
new file mode 100644
index 00000000000..ed8e53ff342
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.util.Collection;
+
+class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase {
+    @Override
+    SinkAndCounters sinkWithPostCommit() {
+        ForwardingCommitter committer = new ForwardingCommitter();
+        return new SinkAndCounters(
+                (TwoPhaseCommittingSink<?, String>)
+                        TestSinkV2.newBuilder()
+                                .setCommitter(committer)
+                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+                                .setWithPostCommitTopology(true)
+                                .build(),
+                () -> committer.successfulCommits);
+    }
+
+    @Override
+    SinkAndCounters sinkWithPostCommitWithRetry() {
+        return new SinkAndCounters(
+                (TwoPhaseCommittingSink<?, String>)
+                        TestSinkV2.newBuilder()
+                                .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
+                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+                                .setWithPostCommitTopology(true)
+                                .build(),
+                () -> 0);
+    }
+
+    @Override
+    SinkAndCounters sinkWithoutPostCommit() {
+        ForwardingCommitter committer = new ForwardingCommitter();
+        return new SinkAndCounters(
+                (TwoPhaseCommittingSink<?, String>)
+                        TestSinkV2.newBuilder()
+                                .setCommitter(committer)
+                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+                                .setWithPostCommitTopology(false)
+                                .build(),
+                () -> committer.successfulCommits);
+    }
+
+    private static class ForwardingCommitter extends 
TestSinkV2.DefaultCommitter {
+        private int successfulCommits = 0;
+
+        @Override
+        public void commit(Collection<CommitRequest<String>> committables) {
+            successfulCommits += committables.size();
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
new file mode 100644
index 00000000000..5bb0135d69b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase {
+
+    @Override
+    SinkAndSuppliers sinkWithoutCommitter() {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
+        return new SinkAndSuppliers(
+                TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                TestSinkV2.StringSerializer::new);
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithCommitter() {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
+                new TestSinkV2.DefaultCommittingSinkWriter<>();
+        return new SinkAndSuppliers(
+                TestSinkV2.<Integer>newBuilder()
+                        .setWriter(sinkWriter)
+                        .setDefaultCommitter()
+                        .build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                TestSinkV2.StringSerializer::new);
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithTimeBasedWriter() {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
+        return new SinkAndSuppliers(
+                TestSinkV2.<Integer>newBuilder()
+                        .setWriter(sinkWriter)
+                        .setDefaultCommitter()
+                        .build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                TestSinkV2.StringSerializer::new);
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String 
stateName) {
+        SnapshottingBufferingSinkWriter sinkWriter = new 
SnapshottingBufferingSinkWriter();
+        TestSinkV2.Builder<Integer> builder =
+                TestSinkV2.newBuilder()
+                        .setWriter(sinkWriter)
+                        .setDefaultCommitter()
+                        .setWithPostCommitTopology(true);
+        if (withState) {
+            builder.setWriterState(true);
+        }
+        if (stateName != null) {
+            builder.setCompatibleStateNames(stateName);
+        }
+        return new SinkAndSuppliers(
+                builder.build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> sinkWriter.lastCheckpointId,
+                () -> new TestSinkV2.StringSerializer());
+    }
+
+    private static class TimeBasedBufferingSinkWriter
+            extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
+            implements ProcessingTimeService.ProcessingTimeCallback {
+
+        private final List<String> cachedCommittables = new ArrayList<>();
+        private ProcessingTimeService processingTimeService;
+
+        @Override
+        public void write(Integer element, Context context) {
+            cachedCommittables.add(
+                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
+        }
+
+        @Override
+        public void onProcessingTime(long time) {
+            elements.addAll(cachedCommittables);
+            cachedCommittables.clear();
+            this.processingTimeService.registerTimer(time + 1000, this);
+        }
+
+        @Override
+        public void init(org.apache.flink.api.connector.sink2.Sink.InitContext 
context) {
+            this.processingTimeService = context.getProcessingTimeService();
+            this.processingTimeService.registerTimer(1000, this);
+        }
+    }
+
+    private static class SnapshottingBufferingSinkWriter
+            extends TestSinkV2.DefaultStatefulSinkWriter {
+        public static final int NOT_SNAPSHOTTED = -1;
+        long lastCheckpointId = NOT_SNAPSHOTTED;
+        boolean endOfInput = false;
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            this.endOfInput = endOfInput;
+        }
+
+        @Override
+        public List<String> snapshotState(long checkpointId) throws 
IOException {
+            lastCheckpointId = checkpointId;
+            return super.snapshotState(checkpointId);
+        }
+
+        @Override
+        public Collection<String> prepareCommit() {
+            if (!endOfInput) {
+                return ImmutableList.of();
+            }
+            List<String> result = elements;
+            elements = new ArrayList<>();
+            return result;
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
similarity index 77%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
index 86cc85c3155..464b08c8f67 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -62,29 +62,31 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
 import static org.assertj.core.api.Assertions.assertThat;
 
-class SinkWriterOperatorTest {
+abstract class SinkWriterOperatorTestBase {
 
     @Test
     void testNotEmitCommittablesWithoutCommitter() throws Exception {
-        final TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
+        SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setWriter(sinkWriter).build().asV2()));
+                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
         testHarness.open();
         testHarness.processElement(1, 1);
 
         assertThat(testHarness.getOutput()).isEmpty();
-        assertThat(sinkWriter.elements).containsOnly("(1,1," + Long.MIN_VALUE 
+ ")");
+        assertThat(sinkAndSuppliers.elementSupplier.get())
+                .containsOnly("(1,1," + Long.MIN_VALUE + ")");
 
         testHarness.prepareSnapshotPreBarrier(1);
         assertThat(testHarness.getOutput()).isEmpty();
         // Elements are flushed
-        assertThat(sinkWriter.elements).isEmpty();
+        assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
         testHarness.close();
     }
 
@@ -92,11 +94,10 @@ class SinkWriterOperatorTest {
     void testWatermarkPropagatedToSinkWriter() throws Exception {
         final long initialTime = 0;
 
-        final TestSink.DefaultSinkWriter<Integer> writer = new 
TestSink.DefaultSinkWriter<>();
+        SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setWriter(writer).build().asV2()));
+                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
         testHarness.open();
 
         testHarness.processWatermark(initialTime);
@@ -104,7 +105,7 @@ class SinkWriterOperatorTest {
 
         assertThat(testHarness.getOutput())
                 .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
-        assertThat(writer.watermarks)
+        assertThat(sinkAndSuppliers.watermarkSupplier.get())
                 .containsExactly(
                         new 
org.apache.flink.api.common.eventtime.Watermark(initialTime),
                         new 
org.apache.flink.api.common.eventtime.Watermark(initialTime + 1));
@@ -117,12 +118,7 @@ class SinkWriterOperatorTest {
 
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(
-                                TestSink.newBuilder()
-                                        .setDefaultCommitter()
-                                        .setWriter(new 
TimeBasedBufferingSinkWriter())
-                                        .build()
-                                        .asV2()));
+                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().sink));
 
         testHarness.open();
 
@@ -149,8 +145,7 @@ class SinkWriterOperatorTest {
     void testEmitOnFlushWithCommitter() throws Exception {
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setDefaultCommitter().build().asV2()));
+                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().sink));
 
         testHarness.open();
         assertThat(testHarness.getOutput()).isEmpty();
@@ -168,8 +163,7 @@ class SinkWriterOperatorTest {
     @Test
     void testEmitOnEndOfInputInBatchMode() throws Exception {
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(
-                        
TestSink.newBuilder().setDefaultCommitter().build().asV2());
+                new SinkWriterOperatorFactory<>(sinkWithCommitter().sink);
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
 
@@ -187,10 +181,9 @@ class SinkWriterOperatorTest {
 
         final long initialTime = 0;
 
-        final SnapshottingBufferingSinkWriter snapshottingWriter =
-                new SnapshottingBufferingSinkWriter();
+        final SinkAndSuppliers sinkAndSuppliers = 
sinkWithSnapshottingWriter(stateful, null);
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                createTestHarnessWithBufferingSinkWriter(snapshottingWriter, 
stateful);
+                
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
 
         testHarness.open();
 
@@ -204,15 +197,14 @@ class SinkWriterOperatorTest {
         // we see the watermark and the committable summary, so the 
committables must be stored in
         // state
         assertThat(testHarness.getOutput()).hasSize(2).contains(new 
Watermark(initialTime));
-        assertThat(snapshottingWriter.lastCheckpointId)
-                .isEqualTo(stateful ? 1L : 
SnapshottingBufferingSinkWriter.NOT_SNAPSHOTTED);
+        assertThat(sinkAndSuppliers.lastCheckpointSupplier.getAsLong())
+                .isEqualTo(stateful ? 1L : -1L);
 
         testHarness.close();
 
+        final SinkAndSuppliers restoredSink = 
sinkWithSnapshottingWriter(stateful, null);
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                restoredTestHarness =
-                        createTestHarnessWithBufferingSinkWriter(
-                                new SnapshottingBufferingSinkWriter(), 
stateful);
+                restoredTestHarness = 
createTestHarnessWithBufferingSinkWriter(restoredSink.sink);
 
         restoredTestHarness.initializeState(snapshot);
         restoredTestHarness.open();
@@ -221,6 +213,7 @@ class SinkWriterOperatorTest {
         restoredTestHarness.endInput();
         final long checkpointId = 2;
         restoredTestHarness.prepareSnapshotPreBarrier(checkpointId);
+        restoredTestHarness.notifyOfCompletedCheckpoint(checkpointId);
 
         if (stateful) {
             assertBasicOutput(restoredTestHarness.getOutput(), 2, 
Long.MAX_VALUE);
@@ -246,16 +239,20 @@ class SinkWriterOperatorTest {
                         "bit", "mention", "thick", "stick", "stir", "easy", 
"sleep", "forth",
                         "cost", "prompt");
 
+        SinkAndSuppliers sinkAndSuppliers =
+                sinkWithSnapshottingWriter(stateful, 
DummySinkOperator.DUMMY_SINK_STATE_NAME);
         final OneInputStreamOperatorTestHarness<String, String> previousSink =
                 new OneInputStreamOperatorTestHarness<>(
-                        new DummySinkOperator(), StringSerializer.INSTANCE);
+                        new 
DummySinkOperator(sinkAndSuppliers.serializerSupplier.get()),
+                        StringSerializer.INSTANCE);
 
         OperatorSubtaskState previousSinkState =
                 TestHarnessUtil.buildSubtaskState(previousSink, 
previousSinkInputs);
 
         // 2. Load previous sink state and verify the output
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                compatibleWriterOperator = 
createCompatibleStateTestHarness(stateful);
+                compatibleWriterOperator =
+                        
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
 
         final List<String> expectedOutput1 =
                 stateful ? new ArrayList<>(previousSinkInputs) : new 
ArrayList<>();
@@ -280,8 +277,11 @@ class SinkWriterOperatorTest {
         assertEmitted(expectedOutput1, compatibleWriterOperator.getOutput());
 
         // 3. Restore the sink without previous sink's state
+        SinkAndSuppliers sinkAndSuppliers2 =
+                sinkWithSnapshottingWriter(stateful, 
DummySinkOperator.DUMMY_SINK_STATE_NAME);
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                restoredSinkOperator = 
createCompatibleStateTestHarness(stateful);
+                restoredSinkOperator =
+                        
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers2.sink);
         final List<String> expectedOutput2 =
                 Arrays.asList(
                         Tuple3.of(2, 2, Long.MIN_VALUE).toString(),
@@ -306,22 +306,18 @@ class SinkWriterOperatorTest {
     void testRestoreCommitterState() throws Exception {
         final List<String> committables = Arrays.asList("state1", "state2");
 
+        SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter();
         final OneInputStreamOperatorTestHarness<String, String> committer =
                 new OneInputStreamOperatorTestHarness<>(
-                        new TestCommitterOperator(), 
StringSerializer.INSTANCE);
+                        new 
TestCommitterOperator(sinkAndSuppliers.serializerSupplier.get()),
+                        StringSerializer.INSTANCE);
 
         final OperatorSubtaskState committerState =
                 TestHarnessUtil.buildSubtaskState(committer, committables);
 
-        final TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(
-                                TestSink.newBuilder()
-                                        .setDefaultCommitter()
-                                        .setWriter(sinkWriter)
-                                        .build()
-                                        .asV2()));
+                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
 
         testHarness.initializeState(committerState);
 
@@ -361,21 +357,16 @@ class SinkWriterOperatorTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        final TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
+        SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new SinkWriterOperatorFactory<>(
-                                TestSink.newBuilder()
-                                        .setWriter(sinkWriter)
-                                        .setDefaultCommitter()
-                                        .build()
-                                        .asV2()));
+                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
         testHarness.open();
         testHarness.processElement(1, 1);
 
         assertThat(testHarness.getOutput()).isEmpty();
         final String record = "(1,1," + Long.MIN_VALUE + ")";
-        assertThat(sinkWriter.elements).containsOnly(record);
+        
assertThat(sinkAndSuppliers.elementSupplier.get()).containsOnly(record);
 
         testHarness.endInput();
 
@@ -384,7 +375,7 @@ class SinkWriterOperatorTest {
         }
 
         assertEmitted(Collections.singletonList(record), 
testHarness.getOutput());
-        assertThat(sinkWriter.elements).isEmpty();
+        assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
 
         testHarness.close();
     }
@@ -472,33 +463,9 @@ class SinkWriterOperatorTest {
     }
 
     private static OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-            createTestHarnessWithBufferingSinkWriter(
-                    SnapshottingBufferingSinkWriter snapshottingWriter, 
boolean stateful)
-                    throws Exception {
-        final TestSink.Builder<Integer> builder =
-                
TestSink.newBuilder().setDefaultCommitter().setWriter(snapshottingWriter);
-        if (stateful) {
-            builder.withWriterState();
-        }
+            createTestHarnessWithBufferingSinkWriter(Sink sink) throws 
Exception {
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(builder.build().asV2());
-        return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
-    }
-
-    private static OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-            createCompatibleStateTestHarness(boolean stateful) throws 
Exception {
-        final SnapshottingBufferingSinkWriter snapshottingWriter =
-                new SnapshottingBufferingSinkWriter();
-        final TestSink.Builder<Integer> builder =
-                TestSink.newBuilder()
-                        .setDefaultCommitter()
-                        
.setCompatibleStateNames(DummySinkOperator.DUMMY_SINK_STATE_NAME)
-                        .setWriter(snapshottingWriter);
-        if (stateful) {
-            builder.withWriterState();
-        }
-        final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(builder.build().asV2());
+                new SinkWriterOperatorFactory<>(sink);
         return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
     }
 
@@ -527,30 +494,6 @@ class SinkWriterOperatorTest {
         }
     }
 
-    private static class TimeBasedBufferingSinkWriter extends 
TestSink.DefaultSinkWriter<Integer>
-            implements Sink.ProcessingTimeService.ProcessingTimeCallback {
-
-        private final List<String> cachedCommittables = new ArrayList<>();
-
-        @Override
-        public void write(Integer element, Context context) {
-            cachedCommittables.add(
-                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
-        }
-
-        void setProcessingTimerService(Sink.ProcessingTimeService 
processingTimerService) {
-            super.setProcessingTimerService(processingTimerService);
-            this.processingTimerService.registerProcessingTimer(1000, this);
-        }
-
-        @Override
-        public void onProcessingTime(long time) {
-            elements.addAll(cachedCommittables);
-            cachedCommittables.clear();
-            this.processingTimerService.registerProcessingTimer(time + 1000, 
this);
-        }
-    }
-
     private static class TestCommitterOperator extends 
AbstractStreamOperator<String>
             implements OneInputStreamOperator<String, String> {
 
@@ -559,6 +502,11 @@ class SinkWriterOperatorTest {
                         "streaming_committer_raw_states", 
BytePrimitiveArraySerializer.INSTANCE);
         private ListState<List<String>> committerState;
         private final List<String> buffer = new ArrayList<>();
+        private final SimpleVersionedSerializer<String> serializer;
+
+        public TestCommitterOperator(SimpleVersionedSerializer<String> 
serializer) {
+            this.serializer = serializer;
+        }
 
         @Override
         public void initializeState(StateInitializationContext context) throws 
Exception {
@@ -567,8 +515,7 @@ class SinkWriterOperatorTest {
                     new SimpleVersionedListState<>(
                             context.getOperatorStateStore()
                                     
.getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
-                            new TestingCommittableSerializer(
-                                    
TestSink.StringCommittableSerializer.INSTANCE));
+                            new TestingCommittableSerializer(serializer));
         }
 
         @Override
@@ -592,13 +539,18 @@ class SinkWriterOperatorTest {
                 new ListStateDescriptor<>(
                         DUMMY_SINK_STATE_NAME, 
BytePrimitiveArraySerializer.INSTANCE);
         ListState<String> sinkState;
+        private final SimpleVersionedSerializer<String> serializer;
+
+        public DummySinkOperator(SimpleVersionedSerializer<String> serializer) 
{
+            this.serializer = serializer;
+        }
 
         public void initializeState(StateInitializationContext context) throws 
Exception {
             super.initializeState(context);
             sinkState =
                     new SimpleVersionedListState<>(
                             
context.getOperatorStateStore().getListState(SINK_STATE_DESC),
-                            TestSink.StringCommittableSerializer.INSTANCE);
+                            serializer);
         }
 
         @Override
@@ -607,33 +559,6 @@ class SinkWriterOperatorTest {
         }
     }
 
-    private static class SnapshottingBufferingSinkWriter
-            extends TestSink.DefaultSinkWriter<Integer> {
-        public static final int NOT_SNAPSHOTTED = -1;
-        long lastCheckpointId = NOT_SNAPSHOTTED;
-
-        @Override
-        public List<String> snapshotState(long checkpointId) {
-            lastCheckpointId = checkpointId;
-            return elements;
-        }
-
-        @Override
-        void restoredFrom(List<String> states) {
-            this.elements = new ArrayList<>(states);
-        }
-
-        @Override
-        public List<String> prepareCommit(boolean flush) {
-            if (!flush) {
-                return Collections.emptyList();
-            }
-            List<String> result = elements;
-            elements = new ArrayList<>();
-            return result;
-        }
-    }
-
     private static class TestingCommittableSerializer
             extends SinkV1WriterCommittableSerializer<String> {
 
@@ -654,4 +579,33 @@ class SinkWriterOperatorTest {
             return out.getCopyOfBuffer();
         }
     }
+
+    abstract SinkAndSuppliers sinkWithoutCommitter();
+
+    abstract SinkAndSuppliers sinkWithTimeBasedWriter();
+
+    abstract SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, 
String stateName);
+
+    abstract SinkAndSuppliers sinkWithCommitter();
+
+    static class SinkAndSuppliers {
+        org.apache.flink.api.connector.sink2.Sink<Integer> sink;
+        Supplier<List<String>> elementSupplier;
+        Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> 
watermarkSupplier;
+        LongSupplier lastCheckpointSupplier;
+        Supplier<SimpleVersionedSerializer<String>> serializerSupplier;
+
+        public SinkAndSuppliers(
+                org.apache.flink.api.connector.sink2.Sink<Integer> sink,
+                Supplier<List<String>> elementSupplier,
+                
Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> 
watermarkSupplier,
+                LongSupplier lastCheckpointSupplier,
+                Supplier<SimpleVersionedSerializer<String>> 
serializerSupplier) {
+            this.sink = sink;
+            this.elementSupplier = elementSupplier;
+            this.watermarkSupplier = watermarkSupplier;
+            this.lastCheckpointSupplier = lastCheckpointSupplier;
+            this.serializerSupplier = serializerSupplier;
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
index ce00362f6a0..742e4438b5d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
@@ -49,7 +49,13 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertNotNull;
 
-/** A {@link Sink TestSink} for all the sink related tests. */
+/**
+ * A {@link Sink TestSink} for all the sink related tests. Use only for tests 
where {@link
+ * SinkV1Adapter} should be tested.
+ *
+ * @deprecated Use {@link TestSinkV2} instead.
+ */
+@Deprecated
 public class TestSink<T> implements Sink<T, String, String, String> {
 
     public static final String END_OF_INPUT_STR = "end of input";
@@ -181,11 +187,6 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
             return this;
         }
 
-        public Builder<T> setGlobalCommitter(GlobalCommitter<String, String> 
globalCommitter) {
-            this.globalCommitter = globalCommitter;
-            return this;
-        }
-
         public Builder<T> setGlobalCommittableSerializer(
                 SimpleVersionedSerializer<String> globalCommittableSerializer) 
{
             this.globalCommittableSerializer = globalCommittableSerializer;
@@ -363,10 +364,6 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
 
         private final String committedSuccessData;
 
-        DefaultGlobalCommitter() {
-            this("");
-        }
-
         DefaultGlobalCommitter(String committedSuccessData) {
             this.committedSuccessData = committedSuccessData;
         }
@@ -397,39 +394,6 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
         }
     }
 
-    /** A {@link GlobalCommitter} that always re-commits global committables 
it received. */
-    static class RetryOnceGlobalCommitter extends DefaultGlobalCommitter {
-
-        private final Set<String> seen = new LinkedHashSet<>();
-
-        @Override
-        public List<String> filterRecoveredCommittables(List<String> 
globalCommittables) {
-            return globalCommittables;
-        }
-
-        @Override
-        public String combine(List<String> committables) {
-            return String.join("|", committables);
-        }
-
-        @Override
-        public void endOfInput() {}
-
-        @Override
-        public List<String> commit(List<String> committables) {
-            committables.forEach(
-                    c -> {
-                        if (seen.remove(c)) {
-                            checkNotNull(committedData);
-                            committedData.add(c);
-                        } else {
-                            seen.add(c);
-                        }
-                    });
-            return new ArrayList<>(seen);
-        }
-    }
-
     /**
      * We introduce this {@link StringCommittableSerializer} is because that 
all the fields of
      * {@link TestSink} should be serializable.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
new file mode 100644
index 00000000000..bb89bf0fe7c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertNotNull;
+
+/** A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink 
related tests. */
+public class TestSinkV2<InputT> implements Sink<InputT> {
+
+    private final DefaultSinkWriter<InputT> writer;
+
+    private TestSinkV2(DefaultSinkWriter<InputT> writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public SinkWriter<InputT> createWriter(InitContext context) {
+        writer.init(context);
+        return writer;
+    }
+
+    DefaultSinkWriter<InputT> getWriter() {
+        return writer;
+    }
+
+    public static <InputT> Builder<InputT> newBuilder() {
+        return new Builder<>();
+    }
+
+    /** A builder class for {@link TestSinkV2}. */
+    public static class Builder<InputT> {
+        private DefaultSinkWriter<InputT> writer = null;
+        private DefaultCommitter committer;
+        private SimpleVersionedSerializer<String> committableSerializer;
+        private boolean withPostCommitTopology = false;
+        private boolean withWriterState = false;
+        private String compatibleStateNames;
+
+        public Builder<InputT> setWriter(DefaultSinkWriter<InputT> writer) {
+            this.writer = checkNotNull(writer);
+            return this;
+        }
+
+        public Builder<InputT> setCommitter(DefaultCommitter committer) {
+            this.committer = committer;
+            return this;
+        }
+
+        public Builder<InputT> setCommittableSerializer(
+                SimpleVersionedSerializer<String> committableSerializer) {
+            this.committableSerializer = committableSerializer;
+            return this;
+        }
+
+        public Builder<InputT> setDefaultCommitter() {
+            this.committer = new DefaultCommitter();
+            this.committableSerializer = StringSerializer.INSTANCE;
+            return this;
+        }
+
+        public Builder<InputT> setDefaultCommitter(
+                Supplier<Queue<Committer.CommitRequest<String>>> 
queueSupplier) {
+            this.committer = new DefaultCommitter(queueSupplier);
+            this.committableSerializer = StringSerializer.INSTANCE;
+            return this;
+        }
+
+        public Builder<InputT> setWithPostCommitTopology(boolean 
withPostCommitTopology) {
+            this.withPostCommitTopology = withPostCommitTopology;
+            return this;
+        }
+
+        public Builder<InputT> setWriterState(boolean withWriterState) {
+            this.withWriterState = withWriterState;
+            return this;
+        }
+
+        public Builder<InputT> setCompatibleStateNames(String 
compatibleStateNames) {
+            this.compatibleStateNames = compatibleStateNames;
+            return this;
+        }
+
+        public TestSinkV2<InputT> build() {
+            if (committer == null) {
+                if (writer == null) {
+                    writer = new DefaultSinkWriter<>();
+                }
+                // SinkV2 with a simple writer
+                return new TestSinkV2<>(writer);
+            } else {
+                if (writer == null) {
+                    writer = new DefaultCommittingSinkWriter<>();
+                }
+                if (!withPostCommitTopology) {
+                    // TwoPhaseCommittingSink with a stateless writer and a 
committer
+                    return new TestSinkV2TwoPhaseCommittingSink<>(
+                            writer, committableSerializer, committer);
+                } else {
+                    if (withWriterState) {
+                        // TwoPhaseCommittingSink with a stateful writer and a 
committer and post
+                        // commit topology
+                        Preconditions.checkArgument(
+                                writer instanceof DefaultStatefulSinkWriter,
+                                "Please provide a DefaultStatefulSinkWriter 
instance");
+                        return new TestStatefulSinkV2(
+                                (DefaultStatefulSinkWriter) writer,
+                                committableSerializer,
+                                committer,
+                                compatibleStateNames);
+                    } else {
+                        // TwoPhaseCommittingSink with a stateless writer and 
a committer and post
+                        // commit topology
+                        Preconditions.checkArgument(
+                                writer instanceof DefaultCommittingSinkWriter,
+                                "Please provide a DefaultCommittingSinkWriter 
instance");
+                        return new TestSinkV2WithPostCommitTopology<>(
+                                (DefaultCommittingSinkWriter) writer,
+                                committableSerializer,
+                                committer);
+                    }
+                }
+            }
+        }
+    }
+
+    private static class TestSinkV2TwoPhaseCommittingSink<InputT> extends 
TestSinkV2<InputT>
+            implements TwoPhaseCommittingSink<InputT, String> {
+        private final DefaultCommitter committer;
+        private final SimpleVersionedSerializer<String> committableSerializer;
+
+        public TestSinkV2TwoPhaseCommittingSink(
+                DefaultSinkWriter<InputT> writer,
+                SimpleVersionedSerializer<String> committableSerializer,
+                DefaultCommitter committer) {
+            super(writer);
+            this.committer = committer;
+            this.committableSerializer = committableSerializer;
+        }
+
+        @Override
+        public Committer<String> createCommitter() {
+            committer.init();
+            return committer;
+        }
+
+        @Override
+        public SimpleVersionedSerializer<String> getCommittableSerializer() {
+            return committableSerializer;
+        }
+
+        @Override
+        public PrecommittingSinkWriter<InputT, String> 
createWriter(InitContext context) {
+            return (PrecommittingSinkWriter<InputT, String>) 
super.createWriter(context);
+        }
+    }
+
+    // -------------------------------------- Sink With PostCommitTopology 
-------------------------
+
+    private static class TestSinkV2WithPostCommitTopology<InputT>
+            extends TestSinkV2TwoPhaseCommittingSink<InputT>
+            implements WithPostCommitTopology<InputT, String> {
+        public TestSinkV2WithPostCommitTopology(
+                DefaultSinkWriter<InputT> writer,
+                SimpleVersionedSerializer<String> committableSerializer,
+                DefaultCommitter committer) {
+            super(writer, committableSerializer, committer);
+        }
+
+        @Override
+        public void 
addPostCommitTopology(DataStream<CommittableMessage<String>> committables) {
+            // We do not need to do anything for tests
+        }
+    }
+
+    private static class TestStatefulSinkV2<InputT> extends 
TestSinkV2WithPostCommitTopology<InputT>
+            implements StatefulSink<InputT, String>, 
StatefulSink.WithCompatibleState {
+        private String compatibleState;
+
+        public TestStatefulSinkV2(
+                DefaultStatefulSinkWriter<InputT> writer,
+                SimpleVersionedSerializer<String> committableSerializer,
+                DefaultCommitter committer,
+                String compatibleState) {
+            super(writer, committableSerializer, committer);
+            this.compatibleState = compatibleState;
+        }
+
+        @Override
+        public DefaultStatefulSinkWriter<InputT> createWriter(InitContext 
context) {
+            return (DefaultStatefulSinkWriter<InputT>) 
super.createWriter(context);
+        }
+
+        @Override
+        public StatefulSinkWriter<InputT, String> restoreWriter(
+                InitContext context, Collection<String> recoveredState) {
+            DefaultStatefulSinkWriter<InputT> statefulWriter =
+                    (DefaultStatefulSinkWriter) getWriter();
+
+            statefulWriter.restore(recoveredState);
+            return statefulWriter;
+        }
+
+        @Override
+        public SimpleVersionedSerializer<String> getWriterStateSerializer() {
+            return new StringSerializer();
+        }
+
+        @Override
+        public Collection<String> getCompatibleWriterStateNames() {
+            return compatibleState == null ? ImmutableSet.of() : 
ImmutableSet.of(compatibleState);
+        }
+    }
+
+    // -------------------------------------- Sink Writer 
------------------------------------------
+
+    /** Base class for out testing {@link SinkWriter}. */
+    public static class DefaultSinkWriter<InputT> implements 
SinkWriter<InputT>, Serializable {
+
+        protected List<String> elements;
+
+        protected List<Watermark> watermarks;
+
+        protected DefaultSinkWriter() {
+            this.elements = new ArrayList<>();
+            this.watermarks = new ArrayList<>();
+        }
+
+        @Override
+        public void write(InputT element, Context context) {
+            elements.add(
+                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
+        }
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            elements = new ArrayList<>();
+        }
+
+        @Override
+        public void writeWatermark(Watermark watermark) {
+            watermarks.add(watermark);
+        }
+
+        @Override
+        public void close() throws Exception {
+            // noting to do here
+        }
+
+        public void init(InitContext context) {
+            // context is not used in default case
+        }
+    }
+
+    /** Base class for out testing {@link 
TwoPhaseCommittingSink.PrecommittingSinkWriter}. */
+    protected static class DefaultCommittingSinkWriter<InputT> extends 
DefaultSinkWriter<InputT>
+            implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, 
String>,
+                    Serializable {
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            // We empty the elements on prepareCommit
+        }
+
+        @Override
+        public Collection<String> prepareCommit() {
+            List<String> result = elements;
+            elements = new ArrayList<>();
+            return result;
+        }
+    }
+
+    /**
+     * Base class for out testing {@link StatefulSink.StatefulSinkWriter}. 
Extends the {@link
+     * DefaultCommittingSinkWriter} for simplicity.
+     */
+    protected static class DefaultStatefulSinkWriter<InputT>
+            extends DefaultCommittingSinkWriter<InputT>
+            implements StatefulSink.StatefulSinkWriter<InputT, String> {
+
+        @Override
+        public List<String> snapshotState(long checkpointId) throws 
IOException {
+            return elements;
+        }
+
+        protected void restore(Collection<String> recoveredState) {
+            this.elements = new ArrayList<>(recoveredState);
+        }
+    }
+
+    // -------------------------------------- Sink Committer 
---------------------------------------
+
+    /** Base class for testing {@link Committer}. */
+    static class DefaultCommitter implements Committer<String>, Serializable {
+
+        @Nullable protected Queue<CommitRequest<String>> committedData;
+
+        private boolean isClosed;
+
+        @Nullable private final Supplier<Queue<CommitRequest<String>>> 
queueSupplier;
+
+        public DefaultCommitter() {
+            this.committedData = new ConcurrentLinkedQueue<>();
+            this.isClosed = false;
+            this.queueSupplier = null;
+        }
+
+        public DefaultCommitter(@Nullable 
Supplier<Queue<CommitRequest<String>>> queueSupplier) {
+            this.queueSupplier = queueSupplier;
+            this.isClosed = false;
+            this.committedData = null;
+        }
+
+        public List<CommitRequest<String>> getCommittedData() {
+            if (committedData != null) {
+                return new ArrayList<>(committedData);
+            } else {
+                return Collections.emptyList();
+            }
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<String>> committables) {
+            if (committedData == null) {
+                assertNotNull(queueSupplier);
+                committedData = queueSupplier.get();
+            }
+            committedData.addAll(committables);
+        }
+
+        public void close() throws Exception {
+            isClosed = true;
+        }
+
+        public boolean isClosed() {
+            return isClosed;
+        }
+
+        public void init() {
+            // context is not used for this implementation
+        }
+    }
+
+    /** A {@link Committer} that always re-commits the committables data it 
received. */
+    static class RetryOnceCommitter extends DefaultCommitter {
+
+        private final Set<CommitRequest<String>> seen = new LinkedHashSet<>();
+
+        @Override
+        public void commit(Collection<CommitRequest<String>> committables) {
+            committables.forEach(
+                    c -> {
+                        if (seen.remove(c)) {
+                            checkNotNull(committedData);
+                            committedData.add(c);
+                        } else {
+                            seen.add(c);
+                            c.retryLater();
+                        }
+                    });
+        }
+    }
+
+    /**
+     * We introduce this {@link StringSerializer} is because that all the 
fields of {@link
+     * TestSinkV2} should be serializable.
+     */
+    public static class StringSerializer
+            implements SimpleVersionedSerializer<String>, Serializable {
+
+        public static final StringSerializer INSTANCE = new StringSerializer();
+
+        @Override
+        public int getVersion() {
+            return SimpleVersionedStringSerializer.INSTANCE.getVersion();
+        }
+
+        @Override
+        public byte[] serialize(String obj) {
+            return SimpleVersionedStringSerializer.INSTANCE.serialize(obj);
+        }
+
+        @Override
+        public String deserialize(int version, byte[] serialized) throws 
IOException {
+            return 
SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized);
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
new file mode 100644
index 00000000000..c516db87467
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.util.Collections;
+import java.util.List;
+
+class WithAdapterCommitterOperatorTest extends CommitterOperatorTestBase {
+
+    @Override
+    SinkAndCounters sinkWithPostCommit() {
+        ForwardingCommitter committer = new ForwardingCommitter();
+        return new SinkAndCounters(
+                (TwoPhaseCommittingSink<?, String>)
+                        TestSink.newBuilder()
+                                .setCommitter(committer)
+                                .setDefaultGlobalCommitter()
+                                .setCommittableSerializer(
+                                        
TestSink.StringCommittableSerializer.INSTANCE)
+                                .build()
+                                .asV2(),
+                () -> committer.successfulCommits);
+    }
+
+    @Override
+    SinkAndCounters sinkWithPostCommitWithRetry() {
+        return new SinkAndCounters(
+                (TwoPhaseCommittingSink<?, String>)
+                        TestSink.newBuilder()
+                                .setCommitter(new 
TestSink.RetryOnceCommitter())
+                                .setDefaultGlobalCommitter()
+                                .setCommittableSerializer(
+                                        
TestSink.StringCommittableSerializer.INSTANCE)
+                                .build()
+                                .asV2(),
+                () -> 0);
+    }
+
+    @Override
+    SinkAndCounters sinkWithoutPostCommit() {
+        ForwardingCommitter committer = new ForwardingCommitter();
+        return new SinkAndCounters(
+                (TwoPhaseCommittingSink<?, String>)
+                        TestSink.newBuilder()
+                                .setCommitter(committer)
+                                .setCommittableSerializer(
+                                        
TestSink.StringCommittableSerializer.INSTANCE)
+                                .build()
+                                .asV2(),
+                () -> committer.successfulCommits);
+    }
+
+    private static class ForwardingCommitter extends TestSink.DefaultCommitter 
{
+        private int successfulCommits = 0;
+
+        @Override
+        public List<String> commit(List<String> committables) {
+            successfulCommits += committables.size();
+            return Collections.emptyList();
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
new file mode 100644
index 00000000000..5af5ac5a679
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase {
+
+    @Override
+    SinkAndSuppliers sinkWithoutCommitter() {
+        TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
+        return new SinkAndSuppliers(
+                TestSink.newBuilder().setWriter(sinkWriter).build().asV2(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                () -> new TestSink.StringCommittableSerializer());
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithCommitter() {
+        TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
+        return new SinkAndSuppliers(
+                
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                () -> new TestSink.StringCommittableSerializer());
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithTimeBasedWriter() {
+        TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
+        return new SinkAndSuppliers(
+                
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                () -> new TestSink.StringCommittableSerializer());
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String 
stateName) {
+        SnapshottingBufferingSinkWriter sinkWriter = new 
SnapshottingBufferingSinkWriter();
+        TestSink.Builder<Integer> builder =
+                
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter();
+        if (withState) {
+            builder.withWriterState();
+        }
+        if (stateName != null) {
+            builder.setCompatibleStateNames(stateName);
+        }
+        return new SinkAndSuppliers(
+                builder.build().asV2(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> sinkWriter.lastCheckpointId,
+                () -> new TestSink.StringCommittableSerializer());
+    }
+
+    private static class TimeBasedBufferingSinkWriter extends 
TestSink.DefaultSinkWriter<Integer>
+            implements Sink.ProcessingTimeService.ProcessingTimeCallback {
+
+        private final List<String> cachedCommittables = new ArrayList<>();
+
+        @Override
+        public void write(Integer element, Context context) {
+            cachedCommittables.add(
+                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
+        }
+
+        void setProcessingTimerService(Sink.ProcessingTimeService 
processingTimerService) {
+            super.setProcessingTimerService(processingTimerService);
+            this.processingTimerService.registerProcessingTimer(1000, this);
+        }
+
+        @Override
+        public void onProcessingTime(long time) {
+            elements.addAll(cachedCommittables);
+            cachedCommittables.clear();
+            this.processingTimerService.registerProcessingTimer(time + 1000, 
this);
+        }
+    }
+
+    private static class SnapshottingBufferingSinkWriter
+            extends TestSink.DefaultSinkWriter<Integer> {
+        public static final int NOT_SNAPSHOTTED = -1;
+        long lastCheckpointId = NOT_SNAPSHOTTED;
+
+        @Override
+        public List<String> snapshotState(long checkpointId) {
+            lastCheckpointId = checkpointId;
+            return elements;
+        }
+
+        @Override
+        void restoredFrom(List<String> states) {
+            this.elements = new ArrayList<>(states);
+        }
+
+        @Override
+        public List<String> prepareCommit(boolean flush) {
+            if (!flush) {
+                return Collections.emptyList();
+            }
+            List<String> result = elements;
+            elements = new ArrayList<>();
+            return result;
+        }
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
new file mode 100644
index 00000000000..70d94004748
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/**
+ * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run 
time implementation.
+ */
+public class SinkV2ITCase extends AbstractTestBase {
+    static final List<Integer> SOURCE_DATA =
+            Arrays.asList(
+                    895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 
630, 682, 765, 434, 970,
+                    714, 795, 288, 422);
+
+    // source send data two times
+    static final int STREAMING_SOURCE_SEND_ELEMENTS_NUM = SOURCE_DATA.size() * 
2;
+
+    static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
+            SOURCE_DATA.stream()
+                    // source send data two times
+                    .flatMap(
+                            x ->
+                                    Collections.nCopies(
+                                            2, Tuple3.of(x, null, 
Long.MIN_VALUE).toString())
+                                            .stream())
+                    .collect(Collectors.toList());
+
+    static final List<String> EXPECTED_COMMITTED_DATA_IN_BATCH_MODE =
+            SOURCE_DATA.stream()
+                    .map(x -> Tuple3.of(x, null, Long.MIN_VALUE).toString())
+                    .collect(Collectors.toList());
+
+    static final Queue<Committer.CommitRequest<String>> COMMIT_QUEUE =
+            new ConcurrentLinkedQueue<>();
+
+    static final BooleanSupplier COMMIT_QUEUE_RECEIVE_ALL_DATA =
+            (BooleanSupplier & Serializable)
+                    () -> COMMIT_QUEUE.size() == 
STREAMING_SOURCE_SEND_ELEMENTS_NUM;
+
+    @Before
+    public void init() {
+        COMMIT_QUEUE.clear();
+    }
+
+    @Test
+    public void writerAndCommitterExecuteInStreamingMode() throws Exception {
+        final StreamExecutionEnvironment env = buildStreamEnv();
+        final FiniteTestSource<Integer> source =
+                new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, 
SOURCE_DATA);
+
+        env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+                .sinkTo(
+                        TestSinkV2.<Integer>newBuilder()
+                                .setDefaultCommitter(
+                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
+                                                        & Serializable)
+                                                () -> COMMIT_QUEUE)
+                                .build());
+        env.execute();
+        assertThat(
+                COMMIT_QUEUE.stream()
+                        .map(Committer.CommitRequest::getCommittable)
+                        .collect(Collectors.toList()),
+                
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
+    }
+
+    @Test
+    public void writerAndCommitterExecuteInBatchMode() throws Exception {
+        final StreamExecutionEnvironment env = buildBatchEnv();
+
+        env.fromCollection(SOURCE_DATA)
+                .sinkTo(
+                        TestSinkV2.<Integer>newBuilder()
+                                .setDefaultCommitter(
+                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
+                                                        & Serializable)
+                                                () -> COMMIT_QUEUE)
+                                .build());
+        env.execute();
+        assertThat(
+                COMMIT_QUEUE.stream()
+                        .map(Committer.CommitRequest::getCommittable)
+                        .collect(Collectors.toList()),
+                
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
+    }
+
+    private StreamExecutionEnvironment buildStreamEnv() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.enableCheckpointing(100);
+        return env;
+    }
+
+    private StreamExecutionEnvironment buildBatchEnv() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        return env;
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
new file mode 100644
index 00000000000..76fcc7b66cc
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.stream.LongStream;
+
+import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatCounter;
+import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+
+/** Tests whether all provided metrics of a {@link Sink} are of the expected 
values (FLIP-33). */
+public class SinkV2MetricsITCase extends TestLogger {
+
+    private static final String TEST_SINK_NAME = "MetricTestSink";
+    // please refer to SinkTransformationTranslator#WRITER_NAME
+    private static final String DEFAULT_WRITER_NAME = "Writer";
+    private static final int DEFAULT_PARALLELISM = 4;
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+    private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                            .build());
+
+    @Test
+    public void testMetrics() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        int numSplits = Math.max(1, env.getParallelism() - 2);
+
+        int numRecordsPerSplit = 10;
+
+        // make sure all parallel instances have processed the same amount of 
records before
+        // validating metrics
+        SharedReference<CyclicBarrier> beforeBarrier =
+                sharedObjects.add(new CyclicBarrier(numSplits + 1));
+        SharedReference<CyclicBarrier> afterBarrier =
+                sharedObjects.add(new CyclicBarrier(numSplits + 1));
+        int stopAtRecord1 = 4;
+        int stopAtRecord2 = numRecordsPerSplit - 1;
+
+        env.fromSequence(0, numSplits - 1)
+                .<Long>flatMap(
+                        (split, collector) ->
+                                LongStream.range(0, 
numRecordsPerSplit).forEach(collector::collect))
+                .returns(BasicTypeInfo.LONG_TYPE_INFO)
+                .map(
+                        i -> {
+                            if (i % numRecordsPerSplit == stopAtRecord1
+                                    || i % numRecordsPerSplit == 
stopAtRecord2) {
+                                beforeBarrier.get().await();
+                                afterBarrier.get().await();
+                            }
+                            return i;
+                        })
+                .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new 
MetricWriter()).build())
+                .name(TEST_SINK_NAME);
+        JobClient jobClient = env.executeAsync();
+        final JobID jobId = jobClient.getJobID();
+
+        beforeBarrier.get().await();
+        assertSinkMetrics(jobId, stopAtRecord1, env.getParallelism(), 
numSplits);
+        afterBarrier.get().await();
+
+        beforeBarrier.get().await();
+        assertSinkMetrics(jobId, stopAtRecord2, env.getParallelism(), 
numSplits);
+        afterBarrier.get().await();
+
+        jobClient.getJobExecutionResult().get();
+    }
+
+    @SuppressWarnings("checkstyle:WhitespaceAfter")
+    private void assertSinkMetrics(
+            JobID jobId, long processedRecordsPerSubtask, int parallelism, int 
numSplits) {
+        List<OperatorMetricGroup> groups =
+                reporter.findOperatorMetricGroups(
+                        jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
+        assertThat(groups, hasSize(parallelism));
+
+        int subtaskWithMetrics = 0;
+        for (OperatorMetricGroup group : groups) {
+            Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
+            // There are only 2 splits assigned; so two groups will not update 
metrics.
+            if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() 
== 0) {
+                continue;
+            }
+            subtaskWithMetrics++;
+
+            // SinkWriterMetricGroup metrics
+            assertThatCounter(metrics.get(MetricNames.IO_NUM_RECORDS_OUT))
+                    .isEqualTo(processedRecordsPerSubtask);
+            assertThatCounter(metrics.get(MetricNames.IO_NUM_BYTES_OUT))
+                    .isEqualTo(processedRecordsPerSubtask * 
MetricWriter.RECORD_SIZE_IN_BYTES);
+            // MetricWriter is just incrementing errors every even record
+            assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_OUT_ERRORS))
+                    .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+            // Test "send" metric series has the same value as "out" metric 
series.
+            assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND))
+                    .isEqualTo(processedRecordsPerSubtask);
+            assertThatCounter(metrics.get(MetricNames.NUM_BYTES_SEND))
+                    .isEqualTo(processedRecordsPerSubtask * 
MetricWriter.RECORD_SIZE_IN_BYTES);
+            assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND_ERRORS))
+                    .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+            // check if the latest send time is fetched
+            assertThatGauge(metrics.get(MetricNames.CURRENT_SEND_TIME))
+                    .isEqualTo((processedRecordsPerSubtask - 1) * 
MetricWriter.BASE_SEND_TIME);
+        }
+        assertThat(subtaskWithMetrics, equalTo(numSplits));
+    }
+
+    private static class MetricWriter extends 
TestSinkV2.DefaultSinkWriter<Long> {
+        static final long BASE_SEND_TIME = 100;
+        static final long RECORD_SIZE_IN_BYTES = 10;
+        private SinkWriterMetricGroup metricGroup;
+        private long sendTime;
+
+        @Override
+        public void init(Sink.InitContext context) {
+            this.metricGroup = context.metricGroup();
+            metricGroup.setCurrentSendTimeGauge(() -> sendTime);
+        }
+
+        @Override
+        public void write(Long element, Context context) {
+            super.write(element, context);
+            sendTime = element * BASE_SEND_TIME;
+            metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
+            if (element % 2 == 0) {
+                metricGroup.getNumRecordsOutErrorsCounter().inc();
+            }
+            
metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
+        }
+    }
+}

Reply via email to