This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4217408576552fc929d9c8331495a9282064cd9c
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Sep 10 11:47:02 2024 +0200

    [FLINK-25920] Refactor & Revise SinkWriterOperatorTestBase
    
    The stateful SinkWriterOperatorTestBase test cases used EOI to manipulate 
the state which was never clean. In particular, it also stored the input 
elements in state until EOI arrived and emitted them all at once. For state 
restoration tests, we emitted records after EOI arrived.
    
    This commit changed the writer state completely to just capture the record 
count, which is much more realistic than storing actual payload. The tests now 
directly assert on the state instead of output.
    
    This commit also introduces an adaptor for serializing basic types in the 
writer state and replaces the hard-to-maintain SinkAndSuppliers with an 
InspectableSink in the sink writer tests that require an abstraction on top of 
the different Sink flavors.
---
 .../core/io/SimpleVersionedSerializerAdapter.java  |  58 ++++++
 .../SinkV1TransformationTranslatorITCase.java      |   8 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |   5 +-
 .../runtime/operators/sink/SinkTestUtil.java       |   4 +-
 .../SinkV2CommitterOperatorDeprecatedTest.java     |   3 -
 .../sink/SinkV2CommitterOperatorTest.java          |  13 +-
 .../SinkV2SinkWriterOperatorDeprecatedTest.java    |  88 ++++-----
 .../sink/SinkV2SinkWriterOperatorTest.java         |  76 ++++----
 .../operators/sink/SinkWriterOperatorTestBase.java | 217 +++++++++------------
 .../streaming/runtime/operators/sink/TestSink.java |  98 ++++------
 .../runtime/operators/sink/TestSinkV2.java         | 111 +++++------
 .../sink/WithAdapterCommitterOperatorTest.java     |  13 +-
 .../sink/WithAdapterSinkWriterOperatorTest.java    |  84 ++++----
 .../operators/sink/deprecated/TestSinkV2.java      |  82 ++++----
 .../nodes/exec/common/CommonExecSinkITCase.java    |   5 +-
 .../flink/test/streaming/runtime/SinkITCase.java   |   3 -
 .../runtime/SinkV2MetricsDeprecatedITCase.java     |   1 -
 .../streaming/runtime/SinkV2MetricsITCase.java     |   5 +-
 18 files changed, 400 insertions(+), 474 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java
 
b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java
new file mode 100644
index 00000000000..03056be26a9
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializerAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Adapter for {@link TypeSerializer} to {@link SimpleVersionedSerializer}. 
The implementation is
+ * naive and should only be used for non-critical paths and tests.
+ */
+@Internal
+public class SimpleVersionedSerializerAdapter<T>
+        implements SimpleVersionedSerializer<T>, Serializable {
+    private final TypeSerializer<T> serializer;
+
+    public SimpleVersionedSerializerAdapter(TypeSerializer<T> serializer) {
+        this.serializer = serializer;
+    }
+
+    public int getVersion() {
+        return serializer.snapshotConfiguration().getCurrentVersion();
+    }
+
+    public byte[] serialize(T value) throws IOException {
+        DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(10);
+        serializer.serialize(value, dataOutputSerializer);
+        return dataOutputSerializer.getCopyOfBuffer();
+    }
+
+    public T deserialize(int version, byte[] serialized) throws IOException {
+        DataInputDeserializer dataInputDeserializer = new 
DataInputDeserializer(serialized);
+        T value = serializer.deserialize(dataInputDeserializer);
+        dataInputDeserializer.releaseArrays();
+        return value;
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
index 8f4ddf77389..2685d57b6da 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV1TransformationTranslatorITCase.java
@@ -121,13 +121,7 @@ class SinkV1TransformationTranslatorITCase
     void generateWriterGlobalCommitterTopology() {
         final StreamGraph streamGraph =
                 buildGraph(
-                        TestSink.newBuilder()
-                                .setCommittableSerializer(
-                                        
TestSink.StringCommittableSerializer.INSTANCE)
-                                .setGlobalCommittableSerializer(
-                                        
TestSink.StringCommittableSerializer.INSTANCE)
-                                .setDefaultGlobalCommitter()
-                                .build(),
+                        
TestSink.newBuilder().setDefaultGlobalCommitter().build(),
                         runtimeExecutionMode);
 
         final StreamNode sourceNode = findNodeName(streamGraph, node -> 
node.contains("Source"));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 34085974363..1972d9e7697 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
@@ -55,6 +56,7 @@ import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -115,7 +117,6 @@ import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformatio
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
-import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -2684,7 +2685,7 @@ class StreamingJobGraphGeneratorTest {
 
         @Override
         public SimpleVersionedSerializer<String> getWriteResultSerializer() {
-            return new TestSinkV2.StringSerializer();
+            return new 
SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE);
         }
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
index b457b5a0b78..cfdae58596c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkTestUtil.java
@@ -52,7 +52,7 @@ class SinkTestUtil {
     static byte[] toBytes(String obj) {
         try {
             return SimpleVersionedSerialization.writeVersionAndSerialize(
-                    TestSinkV2.StringSerializer.INSTANCE, obj);
+                    TestSinkV2.COMMITTABLE_SERIALIZER, obj);
         } catch (IOException e) {
             throw new IllegalStateException(e);
         }
@@ -83,7 +83,7 @@ class SinkTestUtil {
     static String fromBytes(byte[] obj) {
         try {
             return SimpleVersionedSerialization.readVersionAndDeSerialize(
-                    TestSinkV2.StringSerializer.INSTANCE, obj);
+                    TestSinkV2.COMMITTABLE_SERIALIZER, obj);
         } catch (IOException e) {
             throw new IllegalStateException(e);
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
index 51d2fa0d8d0..3aacaa6d1f8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
@@ -35,7 +35,6 @@ class SinkV2CommitterOperatorDeprecatedTest extends 
CommitterOperatorTestBase {
                 (TwoPhaseCommittingSink<?, String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(committer)
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
                                 .setWithPostCommitTopology(true)
                                 .build(),
                 () -> committer.successfulCommits);
@@ -47,7 +46,6 @@ class SinkV2CommitterOperatorDeprecatedTest extends 
CommitterOperatorTestBase {
                 (TwoPhaseCommittingSink<?, String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
                                 .setWithPostCommitTopology(true)
                                 .build(),
                 () -> 0);
@@ -60,7 +58,6 @@ class SinkV2CommitterOperatorDeprecatedTest extends 
CommitterOperatorTestBase {
                 (TwoPhaseCommittingSink<?, String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(committer)
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
                                 .setWithPostCommitTopology(false)
                                 .build(),
                 () -> committer.successfulCommits);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index 349e3961e43..0112b5cf862 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -30,7 +30,6 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 (SupportsCommitter<String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(committer)
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
                                 .setWithPostCommitTopology(true)
                                 .build(),
                 () -> committer.successfulCommits);
@@ -42,7 +41,6 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 (SupportsCommitter<String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
                                 .setWithPostCommitTopology(true)
                                 .build(),
                 () -> 0);
@@ -52,12 +50,11 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
     SinkAndCounters sinkWithoutPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
-                (SupportsCommitter<String>)
-                        TestSinkV2.newBuilder()
-                                .setCommitter(committer)
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
-                                .setWithPostCommitTopology(false)
-                                .build(),
+                TestSinkV2.newBuilder()
+                        .setCommitter(committer)
+                        .setWithPostCommitTopology(false)
+                        .build()
+                        .asSupportsCommitter(),
                 () -> committer.successfulCommits);
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
index bf06065b17c..4e0c3ad0cec 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
 
-import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
-
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 /**
@@ -35,67 +32,49 @@ import java.util.List;
  */
 @Deprecated
 class SinkV2SinkWriterOperatorDeprecatedTest extends 
SinkWriterOperatorTestBase {
-
     @Override
-    SinkAndSuppliers sinkWithoutCommitter() {
+    InspectableSink sinkWithoutCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
-        return new SinkAndSuppliers(
-                TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                TestSinkV2.StringSerializer::new);
+        return new 
InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithCommitter() {
+    InspectableSink sinkWithCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
                 new TestSinkV2.DefaultCommittingSinkWriter<>();
-        return new SinkAndSuppliers(
+        return new InspectableSink(
                 TestSinkV2.<Integer>newBuilder()
-                        .setWriter(sinkWriter)
                         .setDefaultCommitter()
-                        .build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                TestSinkV2.StringSerializer::new);
+                        .setWriter(sinkWriter)
+                        .build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithTimeBasedWriter() {
+    InspectableSink sinkWithTimeBasedWriter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
-        return new SinkAndSuppliers(
+        return new InspectableSink(
                 TestSinkV2.<Integer>newBuilder()
                         .setWriter(sinkWriter)
                         .setDefaultCommitter()
-                        .build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                TestSinkV2.StringSerializer::new);
+                        .build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String 
stateName) {
-        SnapshottingBufferingSinkWriter sinkWriter = new 
SnapshottingBufferingSinkWriter();
+    InspectableSink sinkWithState(boolean withState, String stateName) {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
+                new TestSinkV2.DefaultStatefulSinkWriter<>();
         TestSinkV2.Builder<Integer> builder =
-                TestSinkV2.newBuilder()
-                        .setWriter(sinkWriter)
+                TestSinkV2.<Integer>newBuilder()
                         .setDefaultCommitter()
-                        .setWithPostCommitTopology(true);
+                        .setWithPostCommitTopology(true)
+                        .setWriter(sinkWriter);
         if (withState) {
             builder.setWriterState(true);
         }
         if (stateName != null) {
             builder.setCompatibleStateNames(stateName);
         }
-        return new SinkAndSuppliers(
-                builder.build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> sinkWriter.lastCheckpointId,
-                () -> new TestSinkV2.StringSerializer());
+        return new InspectableSink(builder.build());
     }
 
     private static class TimeBasedBufferingSinkWriter
@@ -125,31 +104,30 @@ class SinkV2SinkWriterOperatorDeprecatedTest extends 
SinkWriterOperatorTestBase
         }
     }
 
-    private static class SnapshottingBufferingSinkWriter
-            extends TestSinkV2.DefaultStatefulSinkWriter {
-        public static final int NOT_SNAPSHOTTED = -1;
-        long lastCheckpointId = NOT_SNAPSHOTTED;
-        boolean endOfInput = false;
+    static class InspectableSink extends 
AbstractInspectableSink<TestSinkV2<Integer>> {
+        InspectableSink(TestSinkV2<Integer> sink) {
+            super(sink);
+        }
+
+        @Override
+        public long getLastCheckpointId() {
+            return getSink().getWriter().lastCheckpointId;
+        }
 
         @Override
-        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
-            this.endOfInput = endOfInput;
+        public List<String> getRecordsOfCurrentCheckpoint() {
+            return getSink().getWriter().elements;
         }
 
         @Override
-        public List<String> snapshotState(long checkpointId) throws 
IOException {
-            lastCheckpointId = checkpointId;
-            return super.snapshotState(checkpointId);
+        public List<Watermark> getWatermarks() {
+            return getSink().getWriter().watermarks;
         }
 
         @Override
-        public Collection<String> prepareCommit() {
-            if (!endOfInput) {
-                return ImmutableList.of();
-            }
-            List<String> result = elements;
-            elements = new ArrayList<>();
-            return result;
+        public int getRecordCountFromState() {
+            return ((TestSinkV2.DefaultStatefulSinkWriter<?>) 
getSink().getWriter())
+                    .getRecordCount();
         }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
index d46cf0c05fc..2d41721cbfb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -30,67 +31,49 @@ import java.util.Collection;
 import java.util.List;
 
 class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase {
-
     @Override
-    SinkAndSuppliers sinkWithoutCommitter() {
+    InspectableSink sinkWithoutCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
-        return new SinkAndSuppliers(
-                TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                TestSinkV2.StringSerializer::new);
+        return new 
InspectableSink(TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithCommitter() {
+    InspectableSink sinkWithCommitter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
                 new TestSinkV2.DefaultCommittingSinkWriter<>();
-        return new SinkAndSuppliers(
+        return new InspectableSink(
                 TestSinkV2.<Integer>newBuilder()
                         .setWriter(sinkWriter)
                         .setDefaultCommitter()
-                        .build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                TestSinkV2.StringSerializer::new);
+                        .build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithTimeBasedWriter() {
+    InspectableSink sinkWithTimeBasedWriter() {
         TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
-        return new SinkAndSuppliers(
+        return new InspectableSink(
                 TestSinkV2.<Integer>newBuilder()
                         .setWriter(sinkWriter)
                         .setDefaultCommitter()
-                        .build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                TestSinkV2.StringSerializer::new);
+                        .build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String 
stateName) {
-        SnapshottingBufferingSinkWriter sinkWriter = new 
SnapshottingBufferingSinkWriter();
+    InspectableSink sinkWithState(boolean withState, String stateName) {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
+                new TestSinkV2.DefaultStatefulSinkWriter<>();
         TestSinkV2.Builder<Integer> builder =
-                TestSinkV2.newBuilder()
-                        .setWriter(sinkWriter)
+                TestSinkV2.<Integer>newBuilder()
                         .setDefaultCommitter()
-                        .setWithPostCommitTopology(true);
+                        .setWithPostCommitTopology(true)
+                        .setWriter(sinkWriter);
         if (withState) {
             builder.setWriterState(true);
         }
         if (stateName != null) {
             builder.setCompatibleStateNames(stateName);
         }
-        return new SinkAndSuppliers(
-                builder.build(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> sinkWriter.lastCheckpointId,
-                () -> new TestSinkV2.StringSerializer());
+        return new InspectableSink(builder.build());
     }
 
     private static class TimeBasedBufferingSinkWriter
@@ -147,4 +130,31 @@ class SinkV2SinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
             return result;
         }
     }
+
+    static class InspectableSink extends 
AbstractInspectableSink<TestSinkV2<Integer>> {
+        InspectableSink(TestSinkV2<Integer> sink) {
+            super(sink);
+        }
+
+        @Override
+        public long getLastCheckpointId() {
+            return getSink().getWriter().lastCheckpointId;
+        }
+
+        @Override
+        public List<String> getRecordsOfCurrentCheckpoint() {
+            return getSink().getWriter().elements;
+        }
+
+        @Override
+        public List<Watermark> getWatermarks() {
+            return getSink().getWriter().watermarks;
+        }
+
+        @Override
+        public int getRecordCountFromState() {
+            return ((TestSinkV2.DefaultStatefulSinkWriter<?>) 
getSink().getWriter())
+                    .getRecordCount();
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
index 4ede8a6ee83..d6a40dc24b3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
@@ -66,8 +65,6 @@ import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
 
 import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
@@ -77,21 +74,21 @@ abstract class SinkWriterOperatorTestBase {
 
     @Test
     void testNotEmitCommittablesWithoutCommitter() throws Exception {
-        SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter();
+        InspectableSink sink = sinkWithoutCommitter();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
         testHarness.open();
         testHarness.processElement(1, 1);
 
         assertThat(testHarness.getOutput()).isEmpty();
-        assertThat(sinkAndSuppliers.elementSupplier.get())
+        assertThat(sink.getRecordsOfCurrentCheckpoint())
                 .containsOnly("(1,1," + Long.MIN_VALUE + ")");
 
         testHarness.prepareSnapshotPreBarrier(1);
         assertThat(testHarness.getOutput()).isEmpty();
         // Elements are flushed
-        assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
         testHarness.close();
     }
 
@@ -99,10 +96,10 @@ abstract class SinkWriterOperatorTestBase {
     void testWatermarkPropagatedToSinkWriter() throws Exception {
         final long initialTime = 0;
 
-        SinkAndSuppliers sinkAndSuppliers = sinkWithoutCommitter();
+        InspectableSink sink = sinkWithoutCommitter();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
         testHarness.open();
 
         testHarness.processWatermark(initialTime);
@@ -110,7 +107,7 @@ abstract class SinkWriterOperatorTestBase {
 
         assertThat(testHarness.getOutput())
                 .containsExactly(new Watermark(initialTime), new 
Watermark(initialTime + 1));
-        assertThat(sinkAndSuppliers.watermarkSupplier.get())
+        assertThat(sink.getWatermarks())
                 .containsExactly(
                         new 
org.apache.flink.api.common.eventtime.Watermark(initialTime),
                         new 
org.apache.flink.api.common.eventtime.Watermark(initialTime + 1));
@@ -123,7 +120,7 @@ abstract class SinkWriterOperatorTestBase {
 
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().sink));
+                        new 
SinkWriterOperatorFactory<>(sinkWithTimeBasedWriter().getSink()));
 
         testHarness.open();
 
@@ -150,7 +147,7 @@ abstract class SinkWriterOperatorTestBase {
     void testEmitOnFlushWithCommitter() throws Exception {
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().sink));
+                        new 
SinkWriterOperatorFactory<>(sinkWithCommitter().getSink()));
 
         testHarness.open();
         assertThat(testHarness.getOutput()).isEmpty();
@@ -168,7 +165,7 @@ abstract class SinkWriterOperatorTestBase {
     @Test
     void testEmitOnEndOfInputInBatchMode() throws Exception {
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(sinkWithCommitter().sink);
+                new SinkWriterOperatorFactory<>(sinkWithCommitter().getSink());
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
 
@@ -186,9 +183,10 @@ abstract class SinkWriterOperatorTestBase {
 
         final long initialTime = 0;
 
-        final SinkAndSuppliers sinkAndSuppliers = 
sinkWithSnapshottingWriter(stateful, null);
+        final InspectableSink sink = sinkWithState(stateful, null);
+        Sink<Integer> sink2 = sink.getSink();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
-                
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
+                new OneInputStreamOperatorTestHarness<>(new 
SinkWriterOperatorFactory<>(sink2));
 
         testHarness.open();
 
@@ -199,39 +197,23 @@ abstract class SinkWriterOperatorTestBase {
         testHarness.prepareSnapshotPreBarrier(1L);
         OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
 
-        // we see the watermark and the committable summary, so the 
committables must be stored in
-        // state
-        assertThat(testHarness.getOutput()).hasSize(2).contains(new 
Watermark(initialTime));
-        assertThat(sinkAndSuppliers.lastCheckpointSupplier.getAsLong())
-                .isEqualTo(stateful ? 1L : -1L);
+        assertThat(sink.getRecordCountFromState()).isEqualTo(2);
+        assertThat(sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
 
         testHarness.close();
 
-        final SinkAndSuppliers restoredSink = 
sinkWithSnapshottingWriter(stateful, null);
+        final InspectableSink restoredSink = sinkWithState(stateful, null);
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-                restoredTestHarness = 
createTestHarnessWithBufferingSinkWriter(restoredSink.sink);
+                restoredTestHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(restoredSink.getSink()));
 
         restoredTestHarness.initializeState(snapshot);
         restoredTestHarness.open();
 
-        // this will flush out the committables that were restored
-        restoredTestHarness.endInput();
-        final long checkpointId = 2;
-        restoredTestHarness.prepareSnapshotPreBarrier(checkpointId);
-        restoredTestHarness.notifyOfCompletedCheckpoint(checkpointId);
-
-        if (stateful) {
-            assertBasicOutput(restoredTestHarness.getOutput(), 2, EOI);
-        } else {
-            
assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue())
-                    .isInstanceOf(CommittableSummary.class)
-                    .satisfies(
-                            cs ->
-                                    
SinkV2Assertions.assertThat((CommittableSummary<?>) cs)
-                                            .hasOverallCommittables(0)
-                                            .hasPendingCommittables(0)
-                                            .hasFailedCommittables(0));
-        }
+        // check that the previous state is correctly restored
+        assertThat(restoredSink.getRecordCountFromState()).isEqualTo(stateful 
? 2 : 0);
+
         restoredTestHarness.close();
     }
 
@@ -244,66 +226,46 @@ abstract class SinkWriterOperatorTestBase {
                         "bit", "mention", "thick", "stick", "stir", "easy", 
"sleep", "forth",
                         "cost", "prompt");
 
-        SinkAndSuppliers sinkAndSuppliers =
-                sinkWithSnapshottingWriter(stateful, 
DummySinkOperator.DUMMY_SINK_STATE_NAME);
+        InspectableSink sink = sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
+        int expectedState = 5;
         final OneInputStreamOperatorTestHarness<String, String> previousSink =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
DummySinkOperator(sinkAndSuppliers.serializerSupplier.get()),
+                        new CompatibleStateSinkOperator<>(
+                                TestSinkV2.WRITER_SERIALIZER, expectedState),
                         StringSerializer.INSTANCE);
 
         OperatorSubtaskState previousSinkState =
                 TestHarnessUtil.buildSubtaskState(previousSink, 
previousSinkInputs);
 
-        // 2. Load previous sink state and verify the output
+        // 2. Load previous sink state and verify state
+        Sink<Integer> sink3 = sink.getSink();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
                 compatibleWriterOperator =
-                        
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers.sink);
-
-        final List<String> expectedOutput1 =
-                stateful ? new ArrayList<>(previousSinkInputs) : new 
ArrayList<>();
-        expectedOutput1.add(Tuple3.of(1, 1, Long.MIN_VALUE).toString());
+                        new OneInputStreamOperatorTestHarness<>(
+                                new SinkWriterOperatorFactory<>(sink3));
 
         // load the state from previous sink
         compatibleWriterOperator.initializeState(previousSinkState);
+        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
 
-        compatibleWriterOperator.open();
-
-        compatibleWriterOperator.processElement(1, 1);
-
-        // this will flush out the committables that were restored from 
previous sink
-        compatibleWriterOperator.endInput();
-        compatibleWriterOperator.prepareSnapshotPreBarrier(1);
-
-        OperatorSubtaskState operatorStateWithoutPreviousState =
-                compatibleWriterOperator.snapshot(1L, 1L);
+        // 3. do another snapshot and check if this also can be restored 
without compabitible state
+        // name
+        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
+        OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 
1L);
 
         compatibleWriterOperator.close();
 
-        assertEmitted(expectedOutput1, compatibleWriterOperator.getOutput());
-
-        // 3. Restore the sink without previous sink's state
-        SinkAndSuppliers sinkAndSuppliers2 =
-                sinkWithSnapshottingWriter(stateful, 
DummySinkOperator.DUMMY_SINK_STATE_NAME);
+        // 4. Restore the sink without previous sink's state
+        InspectableSink sink2 =
+                sinkWithState(stateful, 
CompatibleStateSinkOperator.SINK_STATE_NAME);
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
                 restoredSinkOperator =
-                        
createTestHarnessWithBufferingSinkWriter(sinkAndSuppliers2.sink);
-        final List<String> expectedOutput2 =
-                Arrays.asList(
-                        Tuple3.of(2, 2, Long.MIN_VALUE).toString(),
-                        Tuple3.of(3, 3, Long.MIN_VALUE).toString());
-
-        
restoredSinkOperator.initializeState(operatorStateWithoutPreviousState);
-
-        restoredSinkOperator.open();
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(sink2.getSink()));
 
-        restoredSinkOperator.processElement(2, 2);
-        restoredSinkOperator.processElement(3, 3);
+        restoredSinkOperator.initializeState(snapshot);
+        assertThat(sink.getRecordCountFromState()).isEqualTo(stateful ? 
expectedState : 0);
 
-        // this will flush out the committables that were restored
-        restoredSinkOperator.endInput();
-        restoredSinkOperator.prepareSnapshotPreBarrier(2);
-
-        assertEmitted(expectedOutput2, restoredSinkOperator.getOutput());
         restoredSinkOperator.close();
     }
 
@@ -311,10 +273,10 @@ abstract class SinkWriterOperatorTestBase {
     void testRestoreCommitterState() throws Exception {
         final List<String> committables = Arrays.asList("state1", "state2");
 
-        SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter();
+        InspectableSink sink = sinkWithCommitter();
         final OneInputStreamOperatorTestHarness<String, String> committer =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
TestCommitterOperator(sinkAndSuppliers.serializerSupplier.get()),
+                        new 
TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER),
                         StringSerializer.INSTANCE);
 
         final OperatorSubtaskState committerState =
@@ -322,7 +284,7 @@ abstract class SinkWriterOperatorTestBase {
 
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
 
         testHarness.initializeState(committerState);
 
@@ -362,16 +324,16 @@ abstract class SinkWriterOperatorTestBase {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
-        SinkAndSuppliers sinkAndSuppliers = sinkWithCommitter();
+        InspectableSink sink = sinkWithCommitter();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
-                        new 
SinkWriterOperatorFactory<>(sinkAndSuppliers.sink));
+                        new SinkWriterOperatorFactory<>(sink.getSink()));
         testHarness.open();
         testHarness.processElement(1, 1);
 
         assertThat(testHarness.getOutput()).isEmpty();
         final String record = "(1,1," + Long.MIN_VALUE + ")";
-        
assertThat(sinkAndSuppliers.elementSupplier.get()).containsOnly(record);
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly(record);
 
         testHarness.endInput();
 
@@ -380,7 +342,7 @@ abstract class SinkWriterOperatorTestBase {
         }
 
         assertEmitted(Collections.singletonList(record), 
testHarness.getOutput());
-        assertThat(sinkAndSuppliers.elementSupplier.get()).isEmpty();
+        assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
 
         testHarness.close();
     }
@@ -556,13 +518,6 @@ abstract class SinkWriterOperatorTestBase {
         assertThat(committables).containsExactlyInAnyOrderElementsOf(records);
     }
 
-    private static OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
-            createTestHarnessWithBufferingSinkWriter(Sink sink) throws 
Exception {
-        final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(sink);
-        return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
-    }
-
     private static void assertBasicOutput(
             Collection<Object> queuedOutput, int numberOfCommittables, long 
checkpointId) {
         List<StreamElement> output = fromOutput(queuedOutput);
@@ -622,19 +577,22 @@ abstract class SinkWriterOperatorTestBase {
         }
     }
 
-    private static class DummySinkOperator extends 
AbstractStreamOperator<String>
+    /** Writes state to test whether the sink can read from alternative state 
names. */
+    private static class CompatibleStateSinkOperator<T> extends 
AbstractStreamOperator<String>
             implements OneInputStreamOperator<String, String> {
 
-        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
+        static final String SINK_STATE_NAME = "compatible_sink_state";
 
         static final ListStateDescriptor<byte[]> SINK_STATE_DESC =
-                new ListStateDescriptor<>(
-                        DUMMY_SINK_STATE_NAME, 
BytePrimitiveArraySerializer.INSTANCE);
-        ListState<String> sinkState;
-        private final SimpleVersionedSerializer<String> serializer;
+                new ListStateDescriptor<>(SINK_STATE_NAME, 
BytePrimitiveArraySerializer.INSTANCE);
+        ListState<T> sinkState;
+        private final SimpleVersionedSerializer<T> serializer;
+        private final T initialState;
 
-        public DummySinkOperator(SimpleVersionedSerializer<String> serializer) 
{
+        public CompatibleStateSinkOperator(
+                SimpleVersionedSerializer<T> serializer, T initialState) {
             this.serializer = serializer;
+            this.initialState = initialState;
         }
 
         public void initializeState(StateInitializationContext context) throws 
Exception {
@@ -643,11 +601,14 @@ abstract class SinkWriterOperatorTestBase {
                     new SimpleVersionedListState<>(
                             
context.getOperatorStateStore().getListState(SINK_STATE_DESC),
                             serializer);
+            if (!context.isRestored()) {
+                sinkState.add(initialState);
+            }
         }
 
         @Override
-        public void processElement(StreamRecord<String> element) throws 
Exception {
-            sinkState.add(element.getValue());
+        public void processElement(StreamRecord<String> element) {
+            // do nothing
         }
     }
 
@@ -672,32 +633,42 @@ abstract class SinkWriterOperatorTestBase {
         }
     }
 
-    abstract SinkAndSuppliers sinkWithoutCommitter();
+    abstract InspectableSink sinkWithoutCommitter();
+
+    abstract InspectableSink sinkWithTimeBasedWriter();
+
+    abstract InspectableSink sinkWithState(boolean withState, String 
stateName);
+
+    abstract InspectableSink sinkWithCommitter();
 
-    abstract SinkAndSuppliers sinkWithTimeBasedWriter();
+    /**
+     * Basic abstraction to access the different flavors of sinks. Remove once 
the older interfaces
+     * are removed.
+     */
+    interface InspectableSink {
+        long getLastCheckpointId();
 
-    abstract SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, 
String stateName);
+        List<String> getRecordsOfCurrentCheckpoint();
 
-    abstract SinkAndSuppliers sinkWithCommitter();
+        List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks();
 
-    static class SinkAndSuppliers {
-        org.apache.flink.api.connector.sink2.Sink<Integer> sink;
-        Supplier<List<String>> elementSupplier;
-        Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> 
watermarkSupplier;
-        LongSupplier lastCheckpointSupplier;
-        Supplier<SimpleVersionedSerializer<String>> serializerSupplier;
+        int getRecordCountFromState();
 
-        public SinkAndSuppliers(
-                org.apache.flink.api.connector.sink2.Sink<Integer> sink,
-                Supplier<List<String>> elementSupplier,
-                
Supplier<List<org.apache.flink.api.common.eventtime.Watermark>> 
watermarkSupplier,
-                LongSupplier lastCheckpointSupplier,
-                Supplier<SimpleVersionedSerializer<String>> 
serializerSupplier) {
+        Sink<Integer> getSink();
+    }
+
+    abstract static class AbstractInspectableSink<
+                    S extends 
org.apache.flink.api.connector.sink2.Sink<Integer>>
+            implements InspectableSink {
+        private final S sink;
+
+        protected AbstractInspectableSink(S sink) {
             this.sink = sink;
-            this.elementSupplier = elementSupplier;
-            this.watermarkSupplier = watermarkSupplier;
-            this.lastCheckpointSupplier = lastCheckpointSupplier;
-            this.serializerSupplier = serializerSupplier;
+        }
+
+        @Override
+        public S getSink() {
+            return sink;
         }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
index a062181cc88..22b988a00db 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
 import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
 
 import javax.annotation.Nullable;
@@ -56,13 +56,17 @@ import static org.assertj.core.api.Assertions.assertThat;
  * @deprecated Use {@link TestSinkV2} instead.
  */
 @Deprecated
-public class TestSink<T> implements Sink<T, String, String, String> {
+public class TestSink<T> implements Sink<T, String, Integer, String> {
+    public static final SimpleVersionedSerializerAdapter<String> 
COMMITTABLE_SERIALIZER =
+            TestSinkV2.COMMITTABLE_SERIALIZER;
+    public static final SimpleVersionedSerializerAdapter<Integer> 
WRITER_SERIALIZER =
+            TestSinkV2.WRITER_SERIALIZER;
 
     public static final String END_OF_INPUT_STR = "end of input";
 
     private final DefaultSinkWriter<T> writer;
 
-    @Nullable private final SimpleVersionedSerializer<String> 
writerStateSerializer;
+    @Nullable private final SimpleVersionedSerializer<Integer> 
writerStateSerializer;
 
     @Nullable private final Committer<String> committer;
 
@@ -76,7 +80,7 @@ public class TestSink<T> implements Sink<T, String, String, 
String> {
 
     private TestSink(
             DefaultSinkWriter<T> writer,
-            @Nullable SimpleVersionedSerializer<String> writerStateSerializer,
+            @Nullable SimpleVersionedSerializer<Integer> writerStateSerializer,
             @Nullable Committer<String> committer,
             @Nullable SimpleVersionedSerializer<String> committableSerializer,
             @Nullable GlobalCommitter<String, String> globalCommitter,
@@ -92,7 +96,7 @@ public class TestSink<T> implements Sink<T, String, String, 
String> {
     }
 
     @Override
-    public SinkWriter<T, String, String> createWriter(InitContext context, 
List<String> states) {
+    public SinkWriter<T, String, Integer> createWriter(InitContext context, 
List<Integer> states) {
         writer.init(context);
         writer.restoredFrom(states);
         writer.setProcessingTimerService(context.getProcessingTimeService());
@@ -120,7 +124,7 @@ public class TestSink<T> implements Sink<T, String, String, 
String> {
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<String>> 
getWriterStateSerializer() {
+    public Optional<SimpleVersionedSerializer<Integer>> 
getWriterStateSerializer() {
         return Optional.ofNullable(writerStateSerializer);
     }
 
@@ -137,30 +141,30 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
         return SinkV1Adapter.wrap(this);
     }
 
+    public DefaultSinkWriter<T> getWriter() {
+        return writer;
+    }
+
     /** A builder class for {@link TestSink}. */
     public static class Builder<T> {
 
-        private DefaultSinkWriter writer = new DefaultSinkWriter();
+        private DefaultSinkWriter<T> writer = new DefaultSinkWriter<>();
 
-        private SimpleVersionedSerializer<String> writerStateSerializer;
+        private SimpleVersionedSerializer<Integer> writerStateSerializer;
 
         private Committer<String> committer;
 
-        private SimpleVersionedSerializer<String> committableSerializer;
-
         private GlobalCommitter<String, String> globalCommitter;
 
-        private SimpleVersionedSerializer<String> globalCommittableSerializer;
-
         private Collection<String> compatibleStateNames = 
Collections.emptyList();
 
         public <W> Builder<W> setWriter(DefaultSinkWriter<W> writer) {
-            this.writer = checkNotNull(writer);
+            this.writer = (DefaultSinkWriter<T>) checkNotNull(writer);
             return (Builder<W>) this;
         }
 
         public Builder<T> withWriterState() {
-            this.writerStateSerializer = StringCommittableSerializer.INSTANCE;
+            this.writerStateSerializer = WRITER_SERIALIZER;
             return this;
         }
 
@@ -169,39 +173,23 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
             return this;
         }
 
-        public Builder<T> setCommittableSerializer(
-                SimpleVersionedSerializer<String> committableSerializer) {
-            this.committableSerializer = committableSerializer;
-            return this;
-        }
-
         public Builder<T> setDefaultCommitter() {
             this.committer = new DefaultCommitter();
-            this.committableSerializer = StringCommittableSerializer.INSTANCE;
             return this;
         }
 
         public Builder<T> setDefaultCommitter(Supplier<Queue<String>> 
queueSupplier) {
             this.committer = new DefaultCommitter(queueSupplier);
-            this.committableSerializer = StringCommittableSerializer.INSTANCE;
-            return this;
-        }
-
-        public Builder<T> setGlobalCommittableSerializer(
-                SimpleVersionedSerializer<String> globalCommittableSerializer) 
{
-            this.globalCommittableSerializer = globalCommittableSerializer;
             return this;
         }
 
         public Builder<T> setDefaultGlobalCommitter() {
             this.globalCommitter = new DefaultGlobalCommitter("");
-            this.globalCommittableSerializer = 
StringCommittableSerializer.INSTANCE;
             return this;
         }
 
         public Builder<T> setGlobalCommitter(Supplier<Queue<String>> 
queueSupplier) {
             this.globalCommitter = new DefaultGlobalCommitter(queueSupplier);
-            this.globalCommittableSerializer = 
StringCommittableSerializer.INSTANCE;
             return this;
         }
 
@@ -219,9 +207,9 @@ public class TestSink<T> implements Sink<T, String, String, 
String> {
                     writer,
                     writerStateSerializer,
                     committer,
-                    committableSerializer,
+                    committer == null && globalCommitter == null ? null : 
COMMITTABLE_SERIALIZER,
                     globalCommitter,
-                    globalCommittableSerializer,
+                    globalCommitter == null ? null : COMMITTABLE_SERIALIZER,
                     compatibleStateNames);
         }
     }
@@ -230,7 +218,7 @@ public class TestSink<T> implements Sink<T, String, String, 
String> {
 
     /** Base class for out testing {@link SinkWriter Writers}. */
     public static class DefaultSinkWriter<T>
-            implements SinkWriter<T, String, String>, Serializable {
+            implements SinkWriter<T, String, Integer>, Serializable {
 
         protected List<String> elements;
 
@@ -238,6 +226,10 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
 
         protected ProcessingTimeService processingTimerService;
 
+        private int recordCount;
+
+        protected long lastCheckpointId = -1;
+
         protected DefaultSinkWriter() {
             this.elements = new ArrayList<>();
             this.watermarks = new ArrayList<>();
@@ -247,6 +239,7 @@ public class TestSink<T> implements Sink<T, String, String, 
String> {
         public void write(T element, Context context) {
             elements.add(
                     Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
+            recordCount++;
         }
 
         @Override
@@ -262,20 +255,27 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
         }
 
         @Override
-        public List<String> snapshotState(long checkpointId) throws 
IOException {
-            return Collections.emptyList();
+        public List<Integer> snapshotState(long checkpointId) throws 
IOException {
+            lastCheckpointId = checkpointId;
+            return Collections.singletonList(recordCount);
         }
 
         @Override
         public void close() throws Exception {}
 
-        void restoredFrom(List<String> states) {}
+        void restoredFrom(List<Integer> states) {
+            recordCount = states.isEmpty() ? 0 : states.get(0);
+        }
 
         void setProcessingTimerService(ProcessingTimeService 
processingTimerService) {
             this.processingTimerService = processingTimerService;
         }
 
         public void init(InitContext context) {}
+
+        public int getRecordCount() {
+            return recordCount;
+        }
     }
 
     // -------------------------------------- Sink Committer 
---------------------------------------
@@ -393,30 +393,4 @@ public class TestSink<T> implements Sink<T, String, 
String, String> {
             commit(Collections.singletonList(END_OF_INPUT_STR));
         }
     }
-
-    /**
-     * We introduce this {@link StringCommittableSerializer} is because that 
all the fields of
-     * {@link TestSink} should be serializable.
-     */
-    public static class StringCommittableSerializer
-            implements SimpleVersionedSerializer<String>, Serializable {
-
-        public static final StringCommittableSerializer INSTANCE =
-                new StringCommittableSerializer();
-
-        @Override
-        public int getVersion() {
-            return SimpleVersionedStringSerializer.INSTANCE.getVersion();
-        }
-
-        @Override
-        public byte[] serialize(String obj) {
-            return SimpleVersionedStringSerializer.INSTANCE.serialize(obj);
-        }
-
-        @Override
-        public String deserialize(int version, byte[] serialized) throws 
IOException {
-            return 
SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized);
-        }
-    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
index e11f71c37ee..9caf7a8f339 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
@@ -30,6 +32,7 @@ import 
org.apache.flink.api.connector.sink2.SupportsWriterState;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -37,7 +40,6 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
 import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet;
@@ -61,6 +63,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** A {@link Sink} for all the sink related tests. */
 public class TestSinkV2<InputT> implements Sink<InputT> {
+    public static final SimpleVersionedSerializerAdapter<String> 
COMMITTABLE_SERIALIZER =
+            new SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE);
+    public static final SimpleVersionedSerializerAdapter<Integer> 
WRITER_SERIALIZER =
+            new SimpleVersionedSerializerAdapter<>(IntSerializer.INSTANCE);
 
     private final DefaultSinkWriter<InputT> writer;
 
@@ -81,11 +87,18 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         return new Builder<>();
     }
 
+    public static <InputT> Builder<InputT> 
newBuilder(DefaultSinkWriter<InputT> writer) {
+        return new Builder<InputT>().setWriter(writer);
+    }
+
+    public SupportsCommitter<String> asSupportsCommitter() {
+        throw new UnsupportedOperationException("No committter");
+    }
+
     /** A builder class for {@link TestSinkV2}. */
     public static class Builder<InputT> {
         private DefaultSinkWriter<InputT> writer = null;
         private DefaultCommitter committer;
-        private SimpleVersionedSerializer<String> committableSerializer;
         private boolean withPostCommitTopology = false;
         private boolean withPreCommitTopology = false;
         private boolean withWriterState = false;
@@ -101,22 +114,14 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
             return this;
         }
 
-        public Builder<InputT> setCommittableSerializer(
-                SimpleVersionedSerializer<String> committableSerializer) {
-            this.committableSerializer = committableSerializer;
-            return this;
-        }
-
         public Builder<InputT> setDefaultCommitter() {
             this.committer = new DefaultCommitter();
-            this.committableSerializer = StringSerializer.INSTANCE;
             return this;
         }
 
         public Builder<InputT> setDefaultCommitter(
                 Supplier<Queue<Committer.CommitRequest<String>>> 
queueSupplier) {
             this.committer = new DefaultCommitter(queueSupplier);
-            this.committableSerializer = StringSerializer.INSTANCE;
             return this;
         }
 
@@ -155,7 +160,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                     if (!withPreCommitTopology) {
                         // TwoPhaseCommittingSink with a stateless writer and 
a committer
                         return new TestSinkV2TwoPhaseCommittingSink<>(
-                                writer, committableSerializer, committer);
+                                writer, COMMITTABLE_SERIALIZER, committer);
                     } else {
                         // TwoPhaseCommittingSink with a stateless writer, pre 
commit topology,
                         // committer
@@ -163,9 +168,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                                 writer instanceof DefaultCommittingSinkWriter,
                                 "Please provide a DefaultCommittingSinkWriter 
instance");
                         return new TestSinkV2WithPreCommitTopology<>(
-                                (DefaultCommittingSinkWriter) writer,
-                                committableSerializer,
-                                committer);
+                                writer, COMMITTABLE_SERIALIZER, committer);
                     }
                 } else {
                     if (withWriterState) {
@@ -174,9 +177,9 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                         Preconditions.checkArgument(
                                 writer instanceof DefaultStatefulSinkWriter,
                                 "Please provide a DefaultStatefulSinkWriter 
instance");
-                        return new TestStatefulSinkV2(
-                                (DefaultStatefulSinkWriter) writer,
-                                committableSerializer,
+                        return new TestStatefulSinkV2<>(
+                                (DefaultStatefulSinkWriter<InputT>) writer,
+                                COMMITTABLE_SERIALIZER,
                                 committer,
                                 compatibleStateNames);
                     } else {
@@ -186,9 +189,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                                 writer instanceof DefaultCommittingSinkWriter,
                                 "Please provide a DefaultCommittingSinkWriter 
instance");
                         return new TestSinkV2WithPostCommitTopology<>(
-                                (DefaultCommittingSinkWriter) writer,
-                                committableSerializer,
-                                committer);
+                                writer, COMMITTABLE_SERIALIZER, committer);
                     }
                 }
             }
@@ -215,6 +216,11 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
             return committer;
         }
 
+        @Override
+        public SupportsCommitter<String> asSupportsCommitter() {
+            return this;
+        }
+
         @Override
         public SimpleVersionedSerializer<String> getCommittableSerializer() {
             return committableSerializer;
@@ -268,19 +274,19 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                                     return withLineage.map(old -> old + 
"Transformed");
                                 }
                             })
-                    
.returns(CommittableMessageTypeInfo.of(StringSerializer::new));
+                    .returns(CommittableMessageTypeInfo.of(() -> 
COMMITTABLE_SERIALIZER));
         }
 
         @Override
         public SimpleVersionedSerializer<String> getWriteResultSerializer() {
-            return new StringSerializer();
+            return new 
SimpleVersionedSerializerAdapter<>(StringSerializer.INSTANCE);
         }
     }
 
     private static class TestStatefulSinkV2<InputT> extends 
TestSinkV2WithPostCommitTopology<InputT>
-            implements SupportsWriterState<InputT, String>,
+            implements SupportsWriterState<InputT, Integer>,
                     SupportsWriterState.WithCompatibleState {
-        private String compatibleState;
+        private final String compatibleState;
 
         public TestStatefulSinkV2(
                 DefaultStatefulSinkWriter<InputT> writer,
@@ -297,8 +303,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
 
         @Override
-        public StatefulSinkWriter<InputT, String> restoreWriter(
-                WriterInitContext context, Collection<String> recoveredState) {
+        public StatefulSinkWriter<InputT, Integer> restoreWriter(
+                WriterInitContext context, Collection<Integer> recoveredState) 
{
             DefaultStatefulSinkWriter<InputT> statefulWriter =
                     (DefaultStatefulSinkWriter) getWriter();
 
@@ -307,8 +313,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
 
         @Override
-        public SimpleVersionedSerializer<String> getWriterStateSerializer() {
-            return new StringSerializer();
+        public SimpleVersionedSerializer<Integer> getWriterStateSerializer() {
+            return WRITER_SERIALIZER;
         }
 
         @Override
@@ -326,6 +332,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
 
         protected List<Watermark> watermarks;
 
+        public long lastCheckpointId = -1;
+
         protected DefaultSinkWriter() {
             this.elements = new ArrayList<>();
             this.watermarks = new ArrayList<>();
@@ -380,15 +388,27 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
      */
     protected static class DefaultStatefulSinkWriter<InputT>
             extends DefaultCommittingSinkWriter<InputT>
-            implements StatefulSinkWriter<InputT, String> {
+            implements StatefulSinkWriter<InputT, Integer> {
+        private int recordCount = 0;
 
         @Override
-        public List<String> snapshotState(long checkpointId) throws 
IOException {
-            return elements;
+        public void write(InputT element, Context context) {
+            super.write(element, context);
+            recordCount++;
+        }
+
+        public int getRecordCount() {
+            return recordCount;
         }
 
-        protected void restore(Collection<String> recoveredState) {
-            this.elements = new ArrayList<>(recoveredState);
+        @Override
+        public List<Integer> snapshotState(long checkpointId) throws 
IOException {
+            lastCheckpointId = checkpointId;
+            return Collections.singletonList(recordCount);
+        }
+
+        protected void restore(Collection<Integer> recoveredState) {
+            this.recordCount = recoveredState.isEmpty() ? 0 : 
recoveredState.iterator().next();
         }
     }
 
@@ -464,29 +484,4 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                     });
         }
     }
-
-    /**
-     * We introduce this {@link StringSerializer} is because that all the 
fields of {@link
-     * TestSinkV2} should be serializable.
-     */
-    public static class StringSerializer
-            implements SimpleVersionedSerializer<String>, Serializable {
-
-        public static final StringSerializer INSTANCE = new StringSerializer();
-
-        @Override
-        public int getVersion() {
-            return SimpleVersionedStringSerializer.INSTANCE.getVersion();
-        }
-
-        @Override
-        public byte[] serialize(String obj) {
-            return SimpleVersionedStringSerializer.INSTANCE.serialize(obj);
-        }
-
-        @Override
-        public String deserialize(int version, byte[] serialized) throws 
IOException {
-            return 
SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized);
-        }
-    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
index 6d4c3586f38..f817a6810af 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterCommitterOperatorTest.java
@@ -33,8 +33,6 @@ class WithAdapterCommitterOperatorTest extends 
CommitterOperatorTestBase {
                         TestSink.newBuilder()
                                 .setCommitter(committer)
                                 .setDefaultGlobalCommitter()
-                                .setCommittableSerializer(
-                                        
TestSink.StringCommittableSerializer.INSTANCE)
                                 .build()
                                 .asV2(),
                 () -> committer.successfulCommits);
@@ -47,8 +45,6 @@ class WithAdapterCommitterOperatorTest extends 
CommitterOperatorTestBase {
                         TestSink.newBuilder()
                                 .setCommitter(new 
TestSink.RetryOnceCommitter())
                                 .setDefaultGlobalCommitter()
-                                .setCommittableSerializer(
-                                        
TestSink.StringCommittableSerializer.INSTANCE)
                                 .build()
                                 .asV2(),
                 () -> 0);
@@ -59,12 +55,7 @@ class WithAdapterCommitterOperatorTest extends 
CommitterOperatorTestBase {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
                 (SupportsCommitter<String>)
-                        TestSink.newBuilder()
-                                .setCommitter(committer)
-                                .setCommittableSerializer(
-                                        
TestSink.StringCommittableSerializer.INSTANCE)
-                                .build()
-                                .asV2(),
+                        
TestSink.newBuilder().setCommitter(committer).build().asV2(),
                 () -> committer.successfulCommits);
     }
 
@@ -78,6 +69,6 @@ class WithAdapterCommitterOperatorTest extends 
CommitterOperatorTestBase {
         }
 
         @Override
-        public void close() throws Exception {}
+        public void close() {}
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
index 5af5ac5a679..4e320816aea 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WithAdapterSinkWriterOperatorTest.java
@@ -18,65 +18,46 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.api.java.tuple.Tuple3;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 class WithAdapterSinkWriterOperatorTest extends SinkWriterOperatorTestBase {
-
     @Override
-    SinkAndSuppliers sinkWithoutCommitter() {
+    InspectableSink sinkWithoutCommitter() {
         TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
-        return new SinkAndSuppliers(
-                TestSink.newBuilder().setWriter(sinkWriter).build().asV2(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                () -> new TestSink.StringCommittableSerializer());
+        return new 
InspectableSink(TestSink.newBuilder().setWriter(sinkWriter).build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithCommitter() {
+    InspectableSink sinkWithCommitter() {
         TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
-        return new SinkAndSuppliers(
-                
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                () -> new TestSink.StringCommittableSerializer());
+        return new InspectableSink(
+                
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithTimeBasedWriter() {
+    InspectableSink sinkWithTimeBasedWriter() {
         TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
-        return new SinkAndSuppliers(
-                
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> -1,
-                () -> new TestSink.StringCommittableSerializer());
+        return new InspectableSink(
+                
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build());
     }
 
     @Override
-    SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String 
stateName) {
-        SnapshottingBufferingSinkWriter sinkWriter = new 
SnapshottingBufferingSinkWriter();
+    InspectableSink sinkWithState(boolean withState, String stateName) {
+        TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
         TestSink.Builder<Integer> builder =
                 
TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter();
         if (withState) {
             builder.withWriterState();
+            if (stateName != null) {
+                builder.setCompatibleStateNames(stateName);
+            }
         }
-        if (stateName != null) {
-            builder.setCompatibleStateNames(stateName);
-        }
-        return new SinkAndSuppliers(
-                builder.build().asV2(),
-                () -> sinkWriter.elements,
-                () -> sinkWriter.watermarks,
-                () -> sinkWriter.lastCheckpointId,
-                () -> new TestSink.StringCommittableSerializer());
+        return new InspectableSink(builder.build());
     }
 
     private static class TimeBasedBufferingSinkWriter extends 
TestSink.DefaultSinkWriter<Integer>
@@ -103,30 +84,33 @@ class WithAdapterSinkWriterOperatorTest extends 
SinkWriterOperatorTestBase {
         }
     }
 
-    private static class SnapshottingBufferingSinkWriter
-            extends TestSink.DefaultSinkWriter<Integer> {
-        public static final int NOT_SNAPSHOTTED = -1;
-        long lastCheckpointId = NOT_SNAPSHOTTED;
+    class InspectableSink
+            extends 
AbstractInspectableSink<org.apache.flink.api.connector.sink2.Sink<Integer>> {
+        private final TestSink<Integer> sink;
+
+        InspectableSink(TestSink<Integer> sink) {
+            super(sink.asV2());
+            this.sink = sink;
+        }
 
         @Override
-        public List<String> snapshotState(long checkpointId) {
-            lastCheckpointId = checkpointId;
-            return elements;
+        public long getLastCheckpointId() {
+            return sink.getWriter().lastCheckpointId;
         }
 
         @Override
-        void restoredFrom(List<String> states) {
-            this.elements = new ArrayList<>(states);
+        public List<String> getRecordsOfCurrentCheckpoint() {
+            return sink.getWriter().elements;
         }
 
         @Override
-        public List<String> prepareCommit(boolean flush) {
-            if (!flush) {
-                return Collections.emptyList();
-            }
-            List<String> result = elements;
-            elements = new ArrayList<>();
-            return result;
+        public List<Watermark> getWatermarks() {
+            return sink.getWriter().watermarks;
+        }
+
+        @Override
+        public int getRecordCountFromState() {
+            return sink.getWriter().getRecordCount();
         }
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
index 5bb25605d75..7aabbcb4e49 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
@@ -26,10 +26,10 @@ import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableSet;
@@ -60,6 +60,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 @Deprecated
 public class TestSinkV2<InputT> implements Sink<InputT> {
 
+    public static final SimpleVersionedSerializerAdapter<String> 
COMMITTABLE_SERIALIZER =
+            
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.COMMITTABLE_SERIALIZER;
+    public static final SimpleVersionedSerializerAdapter<Integer> 
WRITER_SERIALIZER =
+            
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.WRITER_SERIALIZER;
     private final DefaultSinkWriter<InputT> writer;
 
     private TestSinkV2(DefaultSinkWriter<InputT> writer) {
@@ -71,7 +75,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         return writer;
     }
 
-    DefaultSinkWriter<InputT> getWriter() {
+    public DefaultSinkWriter<InputT> getWriter() {
         return writer;
     }
 
@@ -83,7 +87,6 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     public static class Builder<InputT> {
         private DefaultSinkWriter<InputT> writer = null;
         private DefaultCommitter committer;
-        private SimpleVersionedSerializer<String> committableSerializer;
         private boolean withPostCommitTopology = false;
         private boolean withWriterState = false;
         private String compatibleStateNames;
@@ -98,22 +101,14 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
             return this;
         }
 
-        public Builder<InputT> setCommittableSerializer(
-                SimpleVersionedSerializer<String> committableSerializer) {
-            this.committableSerializer = committableSerializer;
-            return this;
-        }
-
         public Builder<InputT> setDefaultCommitter() {
             this.committer = new DefaultCommitter();
-            this.committableSerializer = StringSerializer.INSTANCE;
             return this;
         }
 
         public Builder<InputT> setDefaultCommitter(
                 Supplier<Queue<Committer.CommitRequest<String>>> 
queueSupplier) {
             this.committer = new DefaultCommitter(queueSupplier);
-            this.committableSerializer = StringSerializer.INSTANCE;
             return this;
         }
 
@@ -146,7 +141,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                 if (!withPostCommitTopology) {
                     // TwoPhaseCommittingSink with a stateless writer and a 
committer
                     return new TestSinkV2TwoPhaseCommittingSink<>(
-                            writer, committableSerializer, committer);
+                            writer, COMMITTABLE_SERIALIZER, committer);
                 } else {
                     if (withWriterState) {
                         // TwoPhaseCommittingSink with a stateful writer and a 
committer and post
@@ -156,7 +151,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                                 "Please provide a DefaultStatefulSinkWriter 
instance");
                         return new TestStatefulSinkV2(
                                 (DefaultStatefulSinkWriter) writer,
-                                committableSerializer,
+                                COMMITTABLE_SERIALIZER,
                                 committer,
                                 compatibleStateNames);
                     } else {
@@ -167,7 +162,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                                 "Please provide a DefaultCommittingSinkWriter 
instance");
                         return new TestSinkV2WithPostCommitTopology<>(
                                 (DefaultCommittingSinkWriter) writer,
-                                committableSerializer,
+                                COMMITTABLE_SERIALIZER,
                                 committer);
                     }
                 }
@@ -225,7 +220,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     }
 
     private static class TestStatefulSinkV2<InputT> extends 
TestSinkV2WithPostCommitTopology<InputT>
-            implements StatefulSink<InputT, String>, 
StatefulSink.WithCompatibleState {
+            implements StatefulSink<InputT, Integer>, 
StatefulSink.WithCompatibleState {
         private String compatibleState;
 
         public TestStatefulSinkV2(
@@ -243,8 +238,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
 
         @Override
-        public StatefulSinkWriter<InputT, String> restoreWriter(
-                InitContext context, Collection<String> recoveredState) {
+        public StatefulSinkWriter<InputT, Integer> restoreWriter(
+                InitContext context, Collection<Integer> recoveredState) {
             DefaultStatefulSinkWriter<InputT> statefulWriter =
                     (DefaultStatefulSinkWriter) getWriter();
 
@@ -253,8 +248,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
 
         @Override
-        public SimpleVersionedSerializer<String> getWriterStateSerializer() {
-            return new StringSerializer();
+        public SimpleVersionedSerializer<Integer> getWriterStateSerializer() {
+            return WRITER_SERIALIZER;
         }
 
         @Override
@@ -272,6 +267,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
 
         public List<Watermark> watermarks;
 
+        public long lastCheckpointId = -1;
+
         public DefaultSinkWriter() {
             this.elements = new ArrayList<>();
             this.watermarks = new ArrayList<>();
@@ -327,15 +324,27 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
      */
     public static class DefaultStatefulSinkWriter<InputT>
             extends DefaultCommittingSinkWriter<InputT>
-            implements StatefulSink.StatefulSinkWriter<InputT, String> {
+            implements StatefulSink.StatefulSinkWriter<InputT, Integer> {
+        private int recordCount;
 
         @Override
-        public List<String> snapshotState(long checkpointId) throws 
IOException {
-            return elements;
+        public List<Integer> snapshotState(long checkpointId) throws 
IOException {
+            lastCheckpointId = checkpointId;
+            return Collections.singletonList(recordCount);
         }
 
-        protected void restore(Collection<String> recoveredState) {
-            this.elements = new ArrayList<>(recoveredState);
+        @Override
+        public void write(InputT element, Context context) {
+            super.write(element, context);
+            recordCount++;
+        }
+
+        public int getRecordCount() {
+            return recordCount;
+        }
+
+        protected void restore(Collection<Integer> recoveredState) {
+            this.recordCount = recoveredState.isEmpty() ? 0 : 
recoveredState.iterator().next();
         }
     }
 
@@ -411,29 +420,4 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                     });
         }
     }
-
-    /**
-     * We introduce this {@link StringSerializer} is because that all the 
fields of {@link
-     * TestSinkV2} should be serializable.
-     */
-    public static class StringSerializer
-            implements SimpleVersionedSerializer<String>, Serializable {
-
-        public static final StringSerializer INSTANCE = new StringSerializer();
-
-        @Override
-        public int getVersion() {
-            return SimpleVersionedStringSerializer.INSTANCE.getVersion();
-        }
-
-        @Override
-        public byte[] serialize(String obj) {
-            return SimpleVersionedStringSerializer.INSTANCE.serialize(obj);
-        }
-
-        @Override
-        public String deserialize(int version, byte[] serialized) throws 
IOException {
-            return 
SimpleVersionedStringSerializer.INSTANCE.deserialize(version, serialized);
-        }
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index f5b31cb66a7..662568d4afe 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -492,10 +492,7 @@ class CommonExecSinkITCase {
 
     private static TestSink<RowData> buildRecordWriterTestSink(
             TestSink.DefaultSinkWriter<RowData> writer) {
-        return TestSink.newBuilder()
-                .setWriter(writer)
-                
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
-                .build();
+        return TestSink.newBuilder().setWriter(writer).build();
     }
 
     private TableFactoryHarness.SinkBase buildRuntimeSinkProvider(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
index bbc08c9798e..0a3de995c9a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
@@ -226,7 +226,6 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
 
         stream.sinkTo(
                 TestSink.newBuilder()
-                        
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
                         .setGlobalCommitter(
                                 (Supplier<Queue<String>> & Serializable) () -> 
GLOBAL_COMMIT_QUEUE)
                         .build());
@@ -252,8 +251,6 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
         env.fromData(SOURCE_DATA)
                 .sinkTo(
                         TestSink.newBuilder()
-                                .setCommittableSerializer(
-                                        
TestSink.StringCommittableSerializer.INSTANCE)
                                 .setGlobalCommitter(
                                         (Supplier<Queue<String>> & 
Serializable)
                                                 () -> GLOBAL_COMMIT_QUEUE)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
index 3dea88bcf65..95a0e4c1bec 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
@@ -147,7 +147,6 @@ public class SinkV2MetricsDeprecatedITCase extends 
TestLogger {
                 .sinkTo(
                         TestSinkV2.<Long>newBuilder()
                                 .setCommitter(new MetricCommitter(beforeLatch, 
afterLatch))
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
                                 .build())
                 .name(TEST_SINK_NAME);
         JobClient jobClient = env.executeAsync();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
index bb227552a3e..4a67d601a0c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsITCase.java
@@ -140,9 +140,8 @@ public class SinkV2MetricsITCase extends TestLogger {
         env.fromSequence(0, numCommittables - 1)
                 .returns(BasicTypeInfo.LONG_TYPE_INFO)
                 .sinkTo(
-                        TestSinkV2.<Long>newBuilder()
-                                .setCommitter(new MetricCommitter(beforeLatch, 
afterLatch))
-                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+                        (TestSinkV2.<Long>newBuilder()
+                                        .setCommitter(new 
MetricCommitter(beforeLatch, afterLatch)))
                                 .build())
                 .name(TEST_SINK_NAME);
         JobClient jobClient = env.executeAsync();


Reply via email to