This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 580c4ae0e4e [FLINK-33975] Tests for the new Sink V2 transformations
580c4ae0e4e is described below

commit 580c4ae0e4e934133e5be415a740d9f7ffbbda2a
Author: pvary <[email protected]>
AuthorDate: Tue Jan 23 09:22:10 2024 +0100

    [FLINK-33975] Tests for the new Sink V2 transformations
---
 .../flink/api/connector/sink2/Committer.java       |   3 +-
 .../connector/sink2/CommittableWithLineage.java    |   3 +-
 .../datastream/DataStreamSinkDeprecatedTest.java   |  53 ++++
 ...V2TransformationTranslatorDeprecatedITCase.java |  99 ++++++
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 278 +++++++++++++++-
 .../operators/sink/CommitterOperatorTestBase.java  |  10 +-
 ... => SinkV2CommitterOperatorDeprecatedTest.java} |   7 +-
 .../sink/SinkV2CommitterOperatorTest.java          |  10 +-
 .../SinkV2SinkWriterOperatorDeprecatedTest.java    | 155 +++++++++
 .../runtime/operators/sink/TestSinkV2.java         |  97 ++++--
 .../sink/{ => deprecated}/TestSinkV2.java          |  22 +-
 ...deITCase.java => HashcodeDeprecatedITCase.java} |   8 +-
 .../table/planner/functions/HashcodeITCase.java    |   4 +-
 .../scheduling/SpeculativeSchedulerITCase.java     | 129 ++++++++
 ...nkV2ITCase.java => SinkV2DeprecatedITCase.java} |   8 +-
 .../flink/test/streaming/runtime/SinkV2ITCase.java |  50 +++
 .../runtime/SinkV2MetricsDeprecatedITCase.java     | 348 +++++++++++++++++++++
 17 files changed, 1234 insertions(+), 50 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
index c51cce2e3e9..5e4af3eae5c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java
@@ -25,8 +25,7 @@ import java.util.Collection;
 
 /**
  * The {@code Committer} is responsible for committing the data staged by the 
{@link
- * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a 
two-phase commit
- * protocol.
+ * CommittingSinkWriter} in the second step of a two-phase commit protocol.
  *
  * <p>A commit must be idempotent: If some failure occurs in Flink during 
commit phase, Flink will
  * restart from previous checkpoint and re-attempt to commit all committables. 
Thus, some or all
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index a792a3ad48c..96683bfd2ca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 
 import javax.annotation.Nullable;
 
@@ -31,7 +30,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Provides metadata. The exposed exchange type between {@link
- * TwoPhaseCommittingSink.PrecommittingSinkWriter} and {@link Committer}.
+ * org.apache.flink.api.connector.sink2.CommittingSinkWriter} and {@link 
Committer}.
  */
 @Experimental
 public class CommittableWithLineage<CommT> implements 
CommittableMessage<CommT> {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkDeprecatedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkDeprecatedTest.java
new file mode 100644
index 00000000000..45140ff567c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkDeprecatedTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for {@link DataStreamSink}.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+public class DataStreamSinkDeprecatedTest {
+
+    @Test
+    public void testGettingTransformationWithNewSinkAPI() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final Transformation<?> transformation =
+                env.fromData(1, 2)
+                        .sinkTo(TestSinkV2.<Integer>newBuilder().build())
+                        .getTransformation();
+        assertTrue(transformation instanceof SinkTransformation);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void throwExceptionWhenSetUidWithNewSinkAPI() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromData(1, 
2).sinkTo(TestSinkV2.<Integer>newBuilder().build()).setUidHash("Test");
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorDeprecatedITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorDeprecatedITCase.java
new file mode 100644
index 00000000000..9456b4f45a0
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorDeprecatedITCase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link 
org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or 
re-order test cases.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+@RunWith(Parameterized.class)
+public class SinkV2TransformationTranslatorDeprecatedITCase
+        extends SinkTransformationTranslatorITCaseBase<Sink<Integer>> {
+
+    @Override
+    Sink<Integer> simpleSink() {
+        return TestSinkV2.<Integer>newBuilder().build();
+    }
+
+    @Override
+    Sink<Integer> sinkWithCommitter() {
+        return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build();
+    }
+
+    @Override
+    DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer> 
sink) {
+        return stream.sinkTo(sink);
+    }
+
+    @Test
+    public void testSettingOperatorUidHash() {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
+        final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
+        final CustomSinkOperatorUidHashes operatorsUidHashes =
+                CustomSinkOperatorUidHashes.builder()
+                        .setWriterUidHash(writerHash)
+                        .setCommitterUidHash(committerHash)
+                        .build();
+        src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+
+        assertEquals(findWriter(streamGraph).getUserHash(), writerHash);
+        assertEquals(findCommitter(streamGraph).getUserHash(), committerHash);
+    }
+
+    /**
+     * When ever you need to change something in this test case please think 
about possible state
+     * upgrade problems introduced by your changes.
+     */
+    @Test
+    public void testSettingOperatorUids() {
+        final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        final DataStreamSource<Integer> src = env.fromElements(1, 2);
+        src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid);
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertEquals(findWriter(streamGraph).getTransformationUID(), sinkUid);
+        assertEquals(
+                findCommitter(streamGraph).getTransformationUID(),
+                String.format("Sink Committer: %s", sinkUid));
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index d726c57df4e..1c716d4720a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -36,7 +36,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.api.connector.source.mocks.MockSource;
@@ -70,6 +75,11 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
+import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
 import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
@@ -105,6 +115,7 @@ import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformatio
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
@@ -128,6 +139,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -2124,6 +2136,30 @@ class StreamingJobGraphGeneratorTest {
                 .hasRootCauseMessage("This provider is not serializable.");
     }
 
+    @Test
+    void testSinkWithAllInterfaces() {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        final DataStream<Integer> source = env.fromData(1, 2, 
3).name("source");
+        source.rebalance().sinkTo(new 
TestSinkWithAllInterfaces()).name("sink");
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(6);
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            assertThat(jobVertex.getName())
+                    .containsAnyOf(
+                            "source",
+                            "pre-writer",
+                            "Writer",
+                            "pre-committer",
+                            "post-committer",
+                            "Committer");
+        }
+    }
+
     @Test
     void testSupportConcurrentExecutionAttempts() {
         final StreamExecutionEnvironment env =
@@ -2208,6 +2244,41 @@ class StreamingJobGraphGeneratorTest {
         }
     }
 
+    /** Should be removed along {@link TwoPhaseCommittingSink}. */
+    @Deprecated
+    @Test
+    void testSinkSupportConcurrentExecutionAttemptsWithDeprecatedSink() {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        final DataStream<Integer> source = env.fromData(1, 2, 
3).name("source");
+        source.rebalance()
+                .sinkTo(new 
TestSinkWithSupportsConcurrentExecutionAttemptsDeprecated())
+                .name("sink");
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(6);
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            if (jobVertex.getName().contains("source")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+            } else if (jobVertex.getName().contains("pre-writer")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+            } else if (jobVertex.getName().contains("Writer")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+            } else if (jobVertex.getName().contains("pre-committer")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+            } else if (jobVertex.getName().contains("post-committer")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+            } else if (jobVertex.getName().contains("Committer")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+            } else {
+                Assertions.fail("Unexpected job vertex " + 
jobVertex.getName());
+            }
+        }
+    }
+
     @Test
     void testSinkFunctionNotSupportConcurrentExecutionAttempts() {
         testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(
@@ -2324,7 +2395,9 @@ class StreamingJobGraphGeneratorTest {
         public void invoke(T value, Context context) throws Exception {}
     }
 
-    private static class TestSinkWithSupportsConcurrentExecutionAttempts
+    /** Should be removed along {@link TwoPhaseCommittingSink}. */
+    @Deprecated
+    private static class 
TestSinkWithSupportsConcurrentExecutionAttemptsDeprecated
             implements SupportsConcurrentExecutionAttempts,
                     TwoPhaseCommittingSink<Integer, Void>,
                     WithPreWriteTopology<Integer>,
@@ -2409,6 +2482,209 @@ class StreamingJobGraphGeneratorTest {
         }
     }
 
+    private static class TestSinkWithSupportsConcurrentExecutionAttempts
+            implements SupportsConcurrentExecutionAttempts,
+                    Sink<Integer>,
+                    SupportsCommitter<Void>,
+                    SupportsPreWriteTopology<Integer>,
+                    SupportsPreCommitTopology<Void, Void>,
+                    SupportsPostCommitTopology<Void> {
+
+        @Override
+        public SinkWriter<Integer> createWriter(InitContext context) throws 
IOException {
+            throw new UnsupportedOperationException("Not supported");
+        }
+
+        @Override
+        public SinkWriter<Integer> createWriter(WriterInitContext context) 
throws IOException {
+            return new CommittingSinkWriter<Integer, Void>() {
+                @Override
+                public Collection<Void> prepareCommit() throws IOException, 
InterruptedException {
+                    return null;
+                }
+
+                @Override
+                public void write(Integer element, Context context)
+                        throws IOException, InterruptedException {}
+
+                @Override
+                public void flush(boolean endOfInput) throws IOException, 
InterruptedException {}
+
+                @Override
+                public void close() throws Exception {}
+            };
+        }
+
+        @Override
+        public Committer<Void> createCommitter(CommitterInitContext context) 
throws IOException {
+            return new Committer<Void>() {
+                @Override
+                public void commit(Collection<CommitRequest<Void>> 
committables)
+                        throws IOException, InterruptedException {}
+
+                @Override
+                public void close() throws Exception {}
+            };
+        }
+
+        @Override
+        public SimpleVersionedSerializer<Void> getCommittableSerializer() {
+            return new SimpleVersionedSerializer<Void>() {
+                @Override
+                public int getVersion() {
+                    return 0;
+                }
+
+                @Override
+                public byte[] serialize(Void obj) throws IOException {
+                    return new byte[0];
+                }
+
+                @Override
+                public Void deserialize(int version, byte[] serialized) throws 
IOException {
+                    return null;
+                }
+            };
+        }
+
+        @Override
+        public void addPostCommitTopology(DataStream<CommittableMessage<Void>> 
committables) {
+            committables
+                    .map(v -> v)
+                    .name("post-committer")
+                    .returns(CommittableMessageTypeInfo.noOutput())
+                    .rebalance();
+        }
+
+        @Override
+        public DataStream<CommittableMessage<Void>> addPreCommitTopology(
+                DataStream<CommittableMessage<Void>> committables) {
+            return committables
+                    .map(v -> v)
+                    .name("pre-committer")
+                    .returns(CommittableMessageTypeInfo.noOutput())
+                    .rebalance();
+        }
+
+        @Override
+        public SimpleVersionedSerializer<Void> getWriteResultSerializer() {
+            return null;
+        }
+
+        @Override
+        public DataStream<Integer> addPreWriteTopology(DataStream<Integer> 
inputDataStream) {
+            return inputDataStream.map(v -> v).name("pre-writer").rebalance();
+        }
+    }
+
+    private static class TestSinkWithAllInterfaces
+            implements Sink<Integer>,
+                    SupportsPreWriteTopology<Integer>,
+                    SupportsCommitter<Long>,
+                    SupportsPreCommitTopology<String, Long>,
+                    SupportsPostCommitTopology<Long> {
+
+        @Override
+        @Deprecated
+        public SinkWriter<Integer> createWriter(InitContext context) throws 
IOException {
+            throw new UnsupportedOperationException("Not supported");
+        }
+
+        @Override
+        public CommittingSinkWriter<Integer, String> 
createWriter(WriterInitContext context)
+                throws IOException {
+            return new CommittingSinkWriter<Integer, String>() {
+                @Override
+                public Collection<String> prepareCommit() throws IOException, 
InterruptedException {
+                    return null;
+                }
+
+                @Override
+                public void write(Integer element, Context context)
+                        throws IOException, InterruptedException {}
+
+                @Override
+                public void flush(boolean endOfInput) throws IOException, 
InterruptedException {}
+
+                @Override
+                public void close() throws Exception {}
+            };
+        }
+
+        @Override
+        public Committer<Long> createCommitter(CommitterInitContext context) 
throws IOException {
+            return new Committer<Long>() {
+                @Override
+                public void commit(Collection<CommitRequest<Long>> 
committables)
+                        throws IOException, InterruptedException {}
+
+                @Override
+                public void close() throws Exception {}
+            };
+        }
+
+        @Override
+        public SimpleVersionedSerializer<Long> getCommittableSerializer() {
+            return new LongSerializer();
+        }
+
+        @Override
+        public void addPostCommitTopology(DataStream<CommittableMessage<Long>> 
committables) {
+            committables
+                    .map(v -> (CommittableMessage<Void>) null)
+                    .name("post-committer")
+                    .returns(CommittableMessageTypeInfo.noOutput())
+                    .rebalance();
+        }
+
+        @Override
+        public DataStream<CommittableMessage<Long>> addPreCommitTopology(
+                DataStream<CommittableMessage<String>> committables) {
+            return committables
+                    .map(
+                            v -> {
+                                if (v instanceof CommittableSummary) {
+                                    return (CommittableSummary<Long>)
+                                            ((CommittableSummary) v).map();
+                                } else {
+                                    CommittableWithLineage withLineage = 
(CommittableWithLineage) v;
+                                    return (CommittableWithLineage<Long>)
+                                            withLineage.map(old -> 
Long.valueOf(old.toString()));
+                                }
+                            })
+                    .name("pre-committer")
+                    
.returns(CommittableMessageTypeInfo.of(LongSerializer::new))
+                    .rebalance();
+        }
+
+        @Override
+        public SimpleVersionedSerializer<String> getWriteResultSerializer() {
+            return new TestSinkV2.StringSerializer();
+        }
+
+        @Override
+        public DataStream<Integer> addPreWriteTopology(DataStream<Integer> 
inputDataStream) {
+            return inputDataStream.map(v -> v).name("pre-writer").rebalance();
+        }
+    }
+
+    public static class LongSerializer implements 
SimpleVersionedSerializer<Long>, Serializable {
+        @Override
+        public int getVersion() {
+            return 0;
+        }
+
+        @Override
+        public byte[] serialize(Long obj) throws IOException {
+            return new byte[0];
+        }
+
+        @Override
+        public Long deserialize(int version, byte[] serialized) throws 
IOException {
+            return null;
+        }
+    }
+
     private static class SerializationTestOperatorFactory
             extends AbstractStreamOperatorFactory<Integer>
             implements CoordinatedOperatorFactory<Integer> {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index fe3625ec8ce..d1269ed89c1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
@@ -340,7 +340,7 @@ abstract class CommitterOperatorTestBase {
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
             createTestHarness(
-                    TwoPhaseCommittingSink<?, String> sink,
+                    SupportsCommitter<String> sink,
                     boolean isBatchMode,
                     boolean isCheckpointingEnabled)
                     throws Exception {
@@ -351,7 +351,7 @@ abstract class CommitterOperatorTestBase {
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
             createTestHarness(
-                    TwoPhaseCommittingSink<?, String> sink,
+                    SupportsCommitter<String> sink,
                     boolean isBatchMode,
                     boolean isCheckpointingEnabled,
                     int maxParallelism,
@@ -372,10 +372,10 @@ abstract class CommitterOperatorTestBase {
     abstract SinkAndCounters sinkWithoutPostCommit();
 
     static class SinkAndCounters {
-        TwoPhaseCommittingSink<?, String> sink;
+        SupportsCommitter<String> sink;
         IntSupplier commitCounter;
 
-        public SinkAndCounters(TwoPhaseCommittingSink<?, String> sink, 
IntSupplier commitCounter) {
+        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier 
commitCounter) {
             this.sink = sink;
             this.commitCounter = commitCounter;
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
similarity index 91%
copy from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
copy to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
index ed8e53ff342..51d2fa0d8d0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorDeprecatedTest.java
@@ -19,10 +19,15 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
 
 import java.util.Collection;
 
-class SinkV2CommitterOperatorTest extends CommitterOperatorTestBase {
+/**
+ * Should be removed along with {@link 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+class SinkV2CommitterOperatorDeprecatedTest extends CommitterOperatorTestBase {
     @Override
     SinkAndCounters sinkWithPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index ed8e53ff342..349e3961e43 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
 
 import java.util.Collection;
 
@@ -27,7 +27,7 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
     SinkAndCounters sinkWithPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
-                (TwoPhaseCommittingSink<?, String>)
+                (SupportsCommitter<String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(committer)
                                 
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
@@ -38,8 +38,8 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
 
     @Override
     SinkAndCounters sinkWithPostCommitWithRetry() {
-        return new SinkAndCounters(
-                (TwoPhaseCommittingSink<?, String>)
+        return new CommitterOperatorTestBase.SinkAndCounters(
+                (SupportsCommitter<String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(new 
TestSinkV2.RetryOnceCommitter())
                                 
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
@@ -52,7 +52,7 @@ class SinkV2CommitterOperatorTest extends 
CommitterOperatorTestBase {
     SinkAndCounters sinkWithoutPostCommit() {
         ForwardingCommitter committer = new ForwardingCommitter();
         return new SinkAndCounters(
-                (TwoPhaseCommittingSink<?, String>)
+                (SupportsCommitter<String>)
                         TestSinkV2.newBuilder()
                                 .setCommitter(committer)
                                 
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
new file mode 100644
index 00000000000..bf06065b17c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorDeprecatedTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Should be removed along with {@link 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
+class SinkV2SinkWriterOperatorDeprecatedTest extends 
SinkWriterOperatorTestBase {
+
+    @Override
+    SinkAndSuppliers sinkWithoutCommitter() {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TestSinkV2.DefaultSinkWriter<>();
+        return new SinkAndSuppliers(
+                TestSinkV2.<Integer>newBuilder().setWriter(sinkWriter).build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                TestSinkV2.StringSerializer::new);
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithCommitter() {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter =
+                new TestSinkV2.DefaultCommittingSinkWriter<>();
+        return new SinkAndSuppliers(
+                TestSinkV2.<Integer>newBuilder()
+                        .setWriter(sinkWriter)
+                        .setDefaultCommitter()
+                        .build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                TestSinkV2.StringSerializer::new);
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithTimeBasedWriter() {
+        TestSinkV2.DefaultSinkWriter<Integer> sinkWriter = new 
TimeBasedBufferingSinkWriter();
+        return new SinkAndSuppliers(
+                TestSinkV2.<Integer>newBuilder()
+                        .setWriter(sinkWriter)
+                        .setDefaultCommitter()
+                        .build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> -1,
+                TestSinkV2.StringSerializer::new);
+    }
+
+    @Override
+    SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String 
stateName) {
+        SnapshottingBufferingSinkWriter sinkWriter = new 
SnapshottingBufferingSinkWriter();
+        TestSinkV2.Builder<Integer> builder =
+                TestSinkV2.newBuilder()
+                        .setWriter(sinkWriter)
+                        .setDefaultCommitter()
+                        .setWithPostCommitTopology(true);
+        if (withState) {
+            builder.setWriterState(true);
+        }
+        if (stateName != null) {
+            builder.setCompatibleStateNames(stateName);
+        }
+        return new SinkAndSuppliers(
+                builder.build(),
+                () -> sinkWriter.elements,
+                () -> sinkWriter.watermarks,
+                () -> sinkWriter.lastCheckpointId,
+                () -> new TestSinkV2.StringSerializer());
+    }
+
+    private static class TimeBasedBufferingSinkWriter
+            extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
+            implements ProcessingTimeService.ProcessingTimeCallback {
+
+        private final List<String> cachedCommittables = new ArrayList<>();
+        private ProcessingTimeService processingTimeService;
+
+        @Override
+        public void write(Integer element, Context context) {
+            cachedCommittables.add(
+                    Tuple3.of(element, context.timestamp(), 
context.currentWatermark()).toString());
+        }
+
+        @Override
+        public void onProcessingTime(long time) {
+            elements.addAll(cachedCommittables);
+            cachedCommittables.clear();
+            this.processingTimeService.registerTimer(time + 1000, this);
+        }
+
+        @Override
+        public void init(Sink.InitContext context) {
+            this.processingTimeService = context.getProcessingTimeService();
+            this.processingTimeService.registerTimer(1000, this);
+        }
+    }
+
+    private static class SnapshottingBufferingSinkWriter
+            extends TestSinkV2.DefaultStatefulSinkWriter {
+        public static final int NOT_SNAPSHOTTED = -1;
+        long lastCheckpointId = NOT_SNAPSHOTTED;
+        boolean endOfInput = false;
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            this.endOfInput = endOfInput;
+        }
+
+        @Override
+        public List<String> snapshotState(long checkpointId) throws 
IOException {
+            lastCheckpointId = checkpointId;
+            return super.snapshotState(checkpointId);
+        }
+
+        @Override
+        public Collection<String> prepareCommit() {
+            if (!endOfInput) {
+                return ImmutableList.of();
+            }
+            List<String> result = elements;
+            elements = new ArrayList<>();
+            return result;
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
index 66fd323d1ff..e13e5881d71 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
@@ -20,14 +20,22 @@ package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
+import 
org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.util.Preconditions;
@@ -51,7 +59,7 @@ import java.util.function.Supplier;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertNotNull;
 
-/** A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink 
related tests. */
+/** A {@link Sink} for all the sink related tests. */
 public class TestSinkV2<InputT> implements Sink<InputT> {
 
     private final DefaultSinkWriter<InputT> writer;
@@ -79,6 +87,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         private DefaultCommitter committer;
         private SimpleVersionedSerializer<String> committableSerializer;
         private boolean withPostCommitTopology = false;
+        private boolean withPreCommitTopology = false;
         private boolean withWriterState = false;
         private String compatibleStateNames;
 
@@ -116,6 +125,11 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
             return this;
         }
 
+        public Builder<InputT> setWithPreCommitTopology(boolean 
withPreCommitTopology) {
+            this.withPreCommitTopology = withPreCommitTopology;
+            return this;
+        }
+
         public Builder<InputT> setWriterState(boolean withWriterState) {
             this.withWriterState = withWriterState;
             return this;
@@ -138,9 +152,21 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
                     writer = new DefaultCommittingSinkWriter<>();
                 }
                 if (!withPostCommitTopology) {
-                    // TwoPhaseCommittingSink with a stateless writer and a 
committer
-                    return new TestSinkV2TwoPhaseCommittingSink<>(
-                            writer, committableSerializer, committer);
+                    if (!withPreCommitTopology) {
+                        // TwoPhaseCommittingSink with a stateless writer and 
a committer
+                        return new TestSinkV2TwoPhaseCommittingSink<>(
+                                writer, committableSerializer, committer);
+                    } else {
+                        // TwoPhaseCommittingSink with a stateless writer, pre 
commit topology,
+                        // committer
+                        Preconditions.checkArgument(
+                                writer instanceof DefaultCommittingSinkWriter,
+                                "Please provide a DefaultCommittingSinkWriter 
instance");
+                        return new TestSinkV2WithPreCommitTopology<>(
+                                (DefaultCommittingSinkWriter) writer,
+                                committableSerializer,
+                                committer);
+                    }
                 } else {
                     if (withWriterState) {
                         // TwoPhaseCommittingSink with a stateful writer and a 
committer and post
@@ -170,7 +196,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     }
 
     private static class TestSinkV2TwoPhaseCommittingSink<InputT> extends 
TestSinkV2<InputT>
-            implements TwoPhaseCommittingSink<InputT, String> {
+            implements SupportsCommitter<String> {
         private final DefaultCommitter committer;
         private final SimpleVersionedSerializer<String> committableSerializer;
 
@@ -184,7 +210,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
 
         @Override
-        public Committer<String> createCommitter() {
+        public Committer<String> createCommitter(CommitterInitContext context) 
{
             committer.init();
             return committer;
         }
@@ -195,8 +221,8 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
 
         @Override
-        public PrecommittingSinkWriter<InputT, String> 
createWriter(InitContext context) {
-            return (PrecommittingSinkWriter<InputT, String>) 
super.createWriter(context);
+        public SinkWriter<InputT> createWriter(WriterInitContext context) 
throws IOException {
+            return super.createWriter(context);
         }
     }
 
@@ -204,7 +230,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
 
     private static class TestSinkV2WithPostCommitTopology<InputT>
             extends TestSinkV2TwoPhaseCommittingSink<InputT>
-            implements WithPostCommitTopology<InputT, String> {
+            implements SupportsPostCommitTopology<String> {
         public TestSinkV2WithPostCommitTopology(
                 DefaultSinkWriter<InputT> writer,
                 SimpleVersionedSerializer<String> committableSerializer,
@@ -218,8 +244,42 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
     }
 
+    private static class TestSinkV2WithPreCommitTopology<InputT>
+            extends TestSinkV2TwoPhaseCommittingSink<InputT>
+            implements SupportsPreCommitTopology<String, String> {
+        public TestSinkV2WithPreCommitTopology(
+                DefaultSinkWriter<InputT> writer,
+                SimpleVersionedSerializer<String> committableSerializer,
+                DefaultCommitter committer) {
+            super(writer, committableSerializer, committer);
+        }
+
+        @Override
+        public DataStream<CommittableMessage<String>> addPreCommitTopology(
+                DataStream<CommittableMessage<String>> committables) {
+            return committables
+                    .map(
+                            m -> {
+                                if (m instanceof CommittableSummary) {
+                                    return m;
+                                } else {
+                                    CommittableWithLineage<String> withLineage 
=
+                                            (CommittableWithLineage<String>) m;
+                                    return withLineage.map(old -> old + 
"Transformed");
+                                }
+                            })
+                    
.returns(CommittableMessageTypeInfo.of(StringSerializer::new));
+        }
+
+        @Override
+        public SimpleVersionedSerializer<String> getWriteResultSerializer() {
+            return new StringSerializer();
+        }
+    }
+
     private static class TestStatefulSinkV2<InputT> extends 
TestSinkV2WithPostCommitTopology<InputT>
-            implements StatefulSink<InputT, String>, 
StatefulSink.WithCompatibleState {
+            implements SupportsWriterState<InputT, String>,
+                    SupportsWriterState.WithCompatibleState {
         private String compatibleState;
 
         public TestStatefulSinkV2(
@@ -238,7 +298,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
 
         @Override
         public StatefulSinkWriter<InputT, String> restoreWriter(
-                InitContext context, Collection<String> recoveredState) {
+                WriterInitContext context, Collection<String> recoveredState) {
             DefaultStatefulSinkWriter<InputT> statefulWriter =
                     (DefaultStatefulSinkWriter) getWriter();
 
@@ -297,10 +357,9 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
         }
     }
 
-    /** Base class for out testing {@link 
TwoPhaseCommittingSink.PrecommittingSinkWriter}. */
+    /** Base class for out testing {@link CommittingSinkWriter}. */
     protected static class DefaultCommittingSinkWriter<InputT> extends 
DefaultSinkWriter<InputT>
-            implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, 
String>,
-                    Serializable {
+            implements CommittingSinkWriter<InputT, String>, Serializable {
 
         @Override
         public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
@@ -316,12 +375,12 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     }
 
     /**
-     * Base class for out testing {@link StatefulSink.StatefulSinkWriter}. 
Extends the {@link
+     * Base class for out testing {@link StatefulSinkWriter}. Extends the 
{@link
      * DefaultCommittingSinkWriter} for simplicity.
      */
     protected static class DefaultStatefulSinkWriter<InputT>
             extends DefaultCommittingSinkWriter<InputT>
-            implements StatefulSink.StatefulSinkWriter<InputT, String> {
+            implements StatefulSinkWriter<InputT, String> {
 
         @Override
         public List<String> snapshotState(long checkpointId) throws 
IOException {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
similarity index 96%
copy from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
copy to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
index 66fd323d1ff..72b2810d1b4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/deprecated/TestSinkV2.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.operators.sink;
+package org.apache.flink.streaming.runtime.operators.sink.deprecated;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.sink2.Committer;
@@ -51,7 +51,13 @@ import java.util.function.Supplier;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertNotNull;
 
-/** A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink 
related tests. */
+/**
+ * A {@link org.apache.flink.api.connector.sink2.Sink} for all the sink 
related tests.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}.
+ */
+@Deprecated
 public class TestSinkV2<InputT> implements Sink<InputT> {
 
     private final DefaultSinkWriter<InputT> writer;
@@ -262,11 +268,11 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     /** Base class for out testing {@link SinkWriter}. */
     public static class DefaultSinkWriter<InputT> implements 
SinkWriter<InputT>, Serializable {
 
-        protected List<String> elements;
+        public List<String> elements;
 
-        protected List<Watermark> watermarks;
+        public List<Watermark> watermarks;
 
-        protected DefaultSinkWriter() {
+        public DefaultSinkWriter() {
             this.elements = new ArrayList<>();
             this.watermarks = new ArrayList<>();
         }
@@ -298,7 +304,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     }
 
     /** Base class for out testing {@link 
TwoPhaseCommittingSink.PrecommittingSinkWriter}. */
-    protected static class DefaultCommittingSinkWriter<InputT> extends 
DefaultSinkWriter<InputT>
+    public static class DefaultCommittingSinkWriter<InputT> extends 
DefaultSinkWriter<InputT>
             implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, 
String>,
                     Serializable {
 
@@ -319,7 +325,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
      * Base class for out testing {@link StatefulSink.StatefulSinkWriter}. 
Extends the {@link
      * DefaultCommittingSinkWriter} for simplicity.
      */
-    protected static class DefaultStatefulSinkWriter<InputT>
+    public static class DefaultStatefulSinkWriter<InputT>
             extends DefaultCommittingSinkWriter<InputT>
             implements StatefulSink.StatefulSinkWriter<InputT, String> {
 
@@ -387,7 +393,7 @@ public class TestSinkV2<InputT> implements Sink<InputT> {
     }
 
     /** A {@link Committer} that always re-commits the committables data it 
received. */
-    static class RetryOnceCommitter extends DefaultCommitter {
+    public static class RetryOnceCommitter extends DefaultCommitter {
 
         private final Set<CommitRequest<String>> seen = new LinkedHashSet<>();
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeDeprecatedITCase.java
similarity index 97%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeDeprecatedITCase.java
index a0d64adb821..ac10d55a3b9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeDeprecatedITCase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.functions;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
@@ -61,8 +61,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for verifying runtime behaviour of {@link 
BuiltInFunctionDefinitions#INTERNAL_HASHCODE}.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}
  */
-public class HashcodeITCase {
+@Deprecated
+public class HashcodeDeprecatedITCase {
 
     @RegisterExtension
     private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new 
MiniClusterExtension();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
index a0d64adb821..8eaf4a277f7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/HashcodeITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableDescriptor;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.connector.sink.SinkV2Provider;
-import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.MapData;
@@ -158,8 +157,7 @@ public class HashcodeITCase {
         }
 
         @Override
-        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(
-                ScanTableSource.ScanContext context) {
+        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) 
{
             return SourceFunctionProvider.of(new TestSourceFunction(), false);
         }
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
index 6f02b41c573..b326aa824b0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -29,8 +29,13 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
@@ -188,6 +193,21 @@ class SpeculativeSchedulerITCase {
         assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
     }
 
+    /** Should be removed along {@link TwoPhaseCommittingSink}. */
+    @Deprecated
+    @Test
+    public void testSpeculativeSlowSinkDeprecated() throws Exception {
+        executeJob(this::setupSpeculativeSlowSinkDeprecated);
+        waitUntilJobArchived();
+
+        checkResults();
+
+        // no speculative executions for committer
+        assertThat(DummyCommitter.attempts.get()).isEqualTo(parallelism);
+        // there is a speculative execution for writer
+        assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
+    }
+
     @Test
     public void testNonSpeculativeSlowSinkFunction() throws Exception {
         executeJob(this::setupNonSpeculativeSlowSinkFunction);
@@ -343,6 +363,9 @@ class SpeculativeSchedulerITCase {
     }
 
     private void setupSpeculativeSlowSink(StreamExecutionEnvironment env) {
+        DummyCommitter.attempts.set(0);
+        DummyCommitter.blocked.set(false);
+        DummyCommitter.foundSpeculativeWriter = false;
         final DataStream<Long> source =
                 env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                         .setParallelism(parallelism)
@@ -354,6 +377,23 @@ class SpeculativeSchedulerITCase {
                 .slotSharingGroup("sinkGroup");
     }
 
+    /** Should be removed along {@link TwoPhaseCommittingSink}. */
+    @Deprecated
+    private void setupSpeculativeSlowSinkDeprecated(StreamExecutionEnvironment 
env) {
+        DummyCommitter.attempts.set(0);
+        DummyCommitter.blocked.set(false);
+        DummyCommitter.foundSpeculativeWriter = false;
+        final DataStream<Long> source =
+                env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+                        .setParallelism(parallelism)
+                        .name("source")
+                        .slotSharingGroup("sourceGroup");
+        source.sinkTo(new SpeculativeSinkDeprecated())
+                .setParallelism(parallelism)
+                .name("sink")
+                .slotSharingGroup("sinkGroup");
+    }
+
     private void 
setupNonSpeculativeSlowSinkFunction(StreamExecutionEnvironment env) {
         final DataStream<Long> source =
                 env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
@@ -524,6 +564,93 @@ class SpeculativeSchedulerITCase {
     }
 
     private static class SpeculativeSink
+            implements Sink<Long>,
+                    SupportsCommitter<Tuple3<Integer, Integer, Map<Long, 
Long>>>,
+                    SupportsConcurrentExecutionAttempts {
+
+        @Override
+        public SinkWriter<Long> createWriter(InitContext context) {
+            throw new UnsupportedOperationException("Not supported");
+        }
+
+        @Override
+        public CommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, 
Long>>> createWriter(
+                WriterInitContext context) {
+            return new DummyCommittingSinkWriter(
+                    context.getTaskInfo().getIndexOfThisSubtask(),
+                    context.getTaskInfo().getAttemptNumber());
+        }
+
+        @Override
+        public Committer<Tuple3<Integer, Integer, Map<Long, Long>>> 
createCommitter(
+                CommitterInitContext context) {
+            return new DummyCommitter();
+        }
+
+        @Override
+        public SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, 
Long>>>
+                getCommittableSerializer() {
+            return new SimpleVersionedSerializer<Tuple3<Integer, Integer, 
Map<Long, Long>>>() {
+                @Override
+                public int getVersion() {
+                    return 0;
+                }
+
+                @Override
+                public byte[] serialize(Tuple3<Integer, Integer, Map<Long, 
Long>> obj)
+                        throws IOException {
+                    return InstantiationUtil.serializeObject(obj);
+                }
+
+                @Override
+                public Tuple3<Integer, Integer, Map<Long, Long>> deserialize(
+                        int version, byte[] serialized) throws IOException {
+                    try {
+                        return InstantiationUtil.deserializeObject(
+                                serialized, 
Thread.currentThread().getContextClassLoader());
+                    } catch (ClassNotFoundException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            };
+        }
+    }
+
+    private static class DummyCommittingSinkWriter
+            implements CommittingSinkWriter<Long, Tuple3<Integer, Integer, 
Map<Long, Long>>> {
+
+        private final int subTaskIndex;
+
+        private final int attemptNumber;
+
+        public DummyCommittingSinkWriter(int subTaskIndex, int attemptNumber) {
+            this.subTaskIndex = subTaskIndex;
+            this.attemptNumber = attemptNumber;
+        }
+
+        private final Map<Long, Long> numberCountResult = new HashMap<>();
+
+        @Override
+        public void write(Long value, Context context) throws IOException, 
InterruptedException {
+            numberCountResult.merge(value, 1L, Long::sum);
+            maybeSleep();
+        }
+
+        @Override
+        public void flush(boolean endOfInput) {}
+
+        @Override
+        public Collection<Tuple3<Integer, Integer, Map<Long, Long>>> 
prepareCommit() {
+            return Collections.singleton(Tuple3.of(subTaskIndex, 
attemptNumber, numberCountResult));
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+    /** Should be removed along {@link TwoPhaseCommittingSink}. */
+    @Deprecated
+    private static class SpeculativeSinkDeprecated
             implements TwoPhaseCommittingSink<Long, Tuple3<Integer, Integer, 
Map<Long, Long>>>,
                     SupportsConcurrentExecutionAttempts {
 
@@ -570,6 +697,8 @@ class SpeculativeSchedulerITCase {
         }
     }
 
+    /** Should be removed along {@link TwoPhaseCommittingSink}. */
+    @Deprecated
     private static class DummyPrecommittingSinkWriter
             implements PrecommittingSinkWriter<Long, Tuple3<Integer, Integer, 
Map<Long, Long>>> {
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase.java
similarity index 95%
copy from 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
copy to 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase.java
index b23dc34ed74..a101fbe4119 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2DeprecatedITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
 
@@ -44,8 +44,12 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 
 /**
  * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run 
time implementation.
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}
  */
-public class SinkV2ITCase extends AbstractTestBase {
+@Deprecated
+public class SinkV2DeprecatedITCase extends AbstractTestBase {
     static final List<Integer> SOURCE_DATA =
             Arrays.asList(
                     895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 
630, 682, 765, 434, 970,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index b23dc34ed74..f44ab39975b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -103,6 +103,32 @@ public class SinkV2ITCase extends AbstractTestBase {
                 
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
     }
 
+    @Test
+    public void writerAndPrecommitToplogyAndCommitterExecuteInStreamingMode() 
throws Exception {
+        final StreamExecutionEnvironment env = buildStreamEnv();
+        final FiniteTestSource<Integer> source =
+                new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, 
SOURCE_DATA);
+
+        env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+                .sinkTo(
+                        TestSinkV2.<Integer>newBuilder()
+                                .setDefaultCommitter(
+                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
+                                                        & Serializable)
+                                                () -> COMMIT_QUEUE)
+                                .setWithPreCommitTopology(true)
+                                .build());
+        env.execute();
+        assertThat(
+                COMMIT_QUEUE.stream()
+                        .map(Committer.CommitRequest::getCommittable)
+                        .collect(Collectors.toList()),
+                containsInAnyOrder(
+                        EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.stream()
+                                .map(s -> s + "Transformed")
+                                .toArray()));
+    }
+
     @Test
     public void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
@@ -123,6 +149,30 @@ public class SinkV2ITCase extends AbstractTestBase {
                 
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.toArray()));
     }
 
+    @Test
+    public void writerAndPrecommitToplogyAndCommitterExecuteInBatchMode() 
throws Exception {
+        final StreamExecutionEnvironment env = buildBatchEnv();
+
+        env.fromData(SOURCE_DATA)
+                .sinkTo(
+                        TestSinkV2.<Integer>newBuilder()
+                                .setDefaultCommitter(
+                                        
(Supplier<Queue<Committer.CommitRequest<String>>>
+                                                        & Serializable)
+                                                () -> COMMIT_QUEUE)
+                                .setWithPreCommitTopology(true)
+                                .build());
+        env.execute();
+        assertThat(
+                COMMIT_QUEUE.stream()
+                        .map(Committer.CommitRequest::getCommittable)
+                        .collect(Collectors.toList()),
+                containsInAnyOrder(
+                        EXPECTED_COMMITTED_DATA_IN_BATCH_MODE.stream()
+                                .map(s -> s + "Transformed")
+                                .toArray()));
+    }
+
     private StreamExecutionEnvironment buildStreamEnv() {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
new file mode 100644
index 00000000000..3dea88bcf65
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2MetricsDeprecatedITCase.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.operators.sink.deprecated.TestSinkV2;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatCounter;
+import static 
org.apache.flink.metrics.testutils.MetricAssertions.assertThatGauge;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasEntry;
+
+/**
+ * Tests whether all provided metrics of a {@link Sink} are of the expected 
values (FLIP-33).
+ *
+ * <p>Should be removed along with {@link
+ * org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink}
+ */
+@Deprecated
+public class SinkV2MetricsDeprecatedITCase extends TestLogger {
+
+    private static final String TEST_SINK_NAME = "MetricTestSink";
+    // please refer to SinkTransformationTranslator#WRITER_NAME
+    private static final String DEFAULT_WRITER_NAME = "Writer";
+    private static final String DEFAULT_COMMITTER_NAME = "Committer";
+    private static final int DEFAULT_PARALLELISM = 4;
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+    private final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+
+    @Rule
+    public final MiniClusterWithClientResource miniClusterResource =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                            .build());
+
+    @Test
+    public void testMetrics() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        int numSplits = Math.max(1, env.getParallelism() - 2);
+
+        int numRecordsPerSplit = 10;
+
+        // make sure all parallel instances have processed the same amount of 
records before
+        // validating metrics
+        SharedReference<CyclicBarrier> beforeBarrier =
+                sharedObjects.add(new CyclicBarrier(numSplits + 1));
+        SharedReference<CyclicBarrier> afterBarrier =
+                sharedObjects.add(new CyclicBarrier(numSplits + 1));
+        int stopAtRecord1 = 4;
+        int stopAtRecord2 = numRecordsPerSplit - 1;
+
+        env.fromSequence(0, numSplits - 1)
+                .<Long>flatMap(
+                        (split, collector) ->
+                                LongStream.range(0, 
numRecordsPerSplit).forEach(collector::collect))
+                .returns(BasicTypeInfo.LONG_TYPE_INFO)
+                .map(
+                        i -> {
+                            if (i % numRecordsPerSplit == stopAtRecord1
+                                    || i % numRecordsPerSplit == 
stopAtRecord2) {
+                                beforeBarrier.get().await();
+                                afterBarrier.get().await();
+                            }
+                            return i;
+                        })
+                .sinkTo(TestSinkV2.<Long>newBuilder().setWriter(new 
MetricWriter()).build())
+                .name(TEST_SINK_NAME);
+        JobClient jobClient = env.executeAsync();
+        final JobID jobId = jobClient.getJobID();
+
+        beforeBarrier.get().await();
+        assertSinkMetrics(jobId, stopAtRecord1, numSplits);
+        afterBarrier.get().await();
+
+        beforeBarrier.get().await();
+        assertSinkMetrics(jobId, stopAtRecord2, numSplits);
+        afterBarrier.get().await();
+
+        jobClient.getJobExecutionResult().get();
+    }
+
+    @Test
+    public void testCommitterMetrics() throws Exception {
+        final int numCommittables = 7;
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // make sure all parallel instances have processed the records once 
before validating
+        // metrics
+        SharedReference<CountDownLatch> beforeLatch =
+                sharedObjects.add(new CountDownLatch(numCommittables));
+        SharedReference<CountDownLatch> afterLatch = sharedObjects.add(new 
CountDownLatch(1));
+
+        env.fromSequence(0, numCommittables - 1)
+                .returns(BasicTypeInfo.LONG_TYPE_INFO)
+                .sinkTo(
+                        TestSinkV2.<Long>newBuilder()
+                                .setCommitter(new MetricCommitter(beforeLatch, 
afterLatch))
+                                
.setCommittableSerializer(TestSinkV2.StringSerializer.INSTANCE)
+                                .build())
+                .name(TEST_SINK_NAME);
+        JobClient jobClient = env.executeAsync();
+        final JobID jobId = jobClient.getJobID();
+
+        // Run until every committer finished with 1 commit round - everything 
should be retried and
+        // pending
+        beforeLatch.get().await();
+        assertSinkCommitterMetrics(
+                jobId,
+                ImmutableMap.of(
+                        MetricNames.ALREADY_COMMITTED_COMMITTABLES, 0L,
+                        MetricNames.FAILED_COMMITTABLES, 0L,
+                        MetricNames.RETRIED_COMMITTABLES, 7L,
+                        MetricNames.SUCCESSFUL_COMMITTABLES, 0L,
+                        MetricNames.TOTAL_COMMITTABLES, 7L,
+                        MetricNames.PENDING_COMMITTABLES, 7L));
+        afterLatch.get().countDown();
+
+        // Run until finished
+        jobClient.getJobExecutionResult().get();
+        assertSinkCommitterMetrics(
+                jobId,
+                ImmutableMap.of(
+                        MetricNames.ALREADY_COMMITTED_COMMITTABLES, 1L,
+                        MetricNames.FAILED_COMMITTABLES, 2L,
+                        MetricNames.RETRIED_COMMITTABLES, 10L,
+                        MetricNames.SUCCESSFUL_COMMITTABLES, 4L,
+                        MetricNames.TOTAL_COMMITTABLES, 7L,
+                        MetricNames.PENDING_COMMITTABLES, 0L));
+    }
+
+    @SuppressWarnings("checkstyle:WhitespaceAfter")
+    private void assertSinkMetrics(JobID jobId, long 
processedRecordsPerSubtask, int numSplits) {
+        List<OperatorMetricGroup> groups =
+                reporter.findOperatorMetricGroups(
+                        jobId, TEST_SINK_NAME + ": " + DEFAULT_WRITER_NAME);
+
+        int subtaskWithMetrics = 0;
+        for (OperatorMetricGroup group : groups) {
+            Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
+            // There are only 2 splits assigned; so two groups will not update 
metrics.
+            if (group.getIOMetricGroup() == null
+                    || group.getIOMetricGroup().getNumRecordsOutCounter() == 
null
+                    || 
group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) {
+                continue;
+            }
+            subtaskWithMetrics++;
+
+            // SinkWriterMetricGroup metrics
+            assertThatCounter(metrics.get(MetricNames.IO_NUM_RECORDS_OUT))
+                    .isEqualTo(processedRecordsPerSubtask);
+            assertThatCounter(metrics.get(MetricNames.IO_NUM_BYTES_OUT))
+                    .isEqualTo(processedRecordsPerSubtask * 
MetricWriter.RECORD_SIZE_IN_BYTES);
+            // MetricWriter is just incrementing errors every even record
+            assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_OUT_ERRORS))
+                    .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+            // Test "send" metric series has the same value as "out" metric 
series.
+            assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND))
+                    .isEqualTo(processedRecordsPerSubtask);
+            assertThatCounter(metrics.get(MetricNames.NUM_BYTES_SEND))
+                    .isEqualTo(processedRecordsPerSubtask * 
MetricWriter.RECORD_SIZE_IN_BYTES);
+            assertThatCounter(metrics.get(MetricNames.NUM_RECORDS_SEND_ERRORS))
+                    .isEqualTo((processedRecordsPerSubtask + 1) / 2);
+
+            // check if the latest send time is fetched
+            assertThatGauge(metrics.get(MetricNames.CURRENT_SEND_TIME))
+                    .isEqualTo((processedRecordsPerSubtask - 1) * 
MetricWriter.BASE_SEND_TIME);
+        }
+        assertThat(subtaskWithMetrics, equalTo(numSplits));
+    }
+
+    private void assertSinkCommitterMetrics(JobID jobId, Map<String, Long> 
expected) {
+        List<OperatorMetricGroup> groups =
+                reporter.findOperatorMetricGroups(
+                        jobId, TEST_SINK_NAME + ": " + DEFAULT_COMMITTER_NAME);
+
+        Map<String, Long> aggregated = new HashMap<>(6);
+        for (OperatorMetricGroup group : groups) {
+            Map<String, Metric> metrics = reporter.getMetricsByGroup(group);
+
+            for (String metricName :
+                    Arrays.asList(
+                            MetricNames.SUCCESSFUL_COMMITTABLES,
+                            MetricNames.ALREADY_COMMITTED_COMMITTABLES,
+                            MetricNames.RETRIED_COMMITTABLES,
+                            MetricNames.FAILED_COMMITTABLES,
+                            MetricNames.TOTAL_COMMITTABLES)) {
+                final Counter counter = (Counter) metrics.get(metricName);
+                if (counter != null) {
+                    aggregated.merge(metricName, counter.getCount(), 
Long::sum);
+                }
+            }
+
+            Gauge<Integer> pendingMetrics =
+                    (Gauge<Integer>) 
metrics.get(MetricNames.PENDING_COMMITTABLES);
+            if (pendingMetrics != null && pendingMetrics.getValue() != null) {
+                aggregated.merge(
+                        MetricNames.PENDING_COMMITTABLES,
+                        pendingMetrics.getValue().longValue(),
+                        Long::sum);
+            }
+        }
+
+        expected.entrySet()
+                .forEach(e -> assertThat(aggregated, hasEntry(e.getKey(), 
e.getValue())));
+    }
+
+    private static class MetricWriter extends 
TestSinkV2.DefaultSinkWriter<Long> {
+        static final long BASE_SEND_TIME = 100;
+        static final long RECORD_SIZE_IN_BYTES = 10;
+        private SinkWriterMetricGroup metricGroup;
+        private long sendTime;
+
+        @Override
+        public void init(Sink.InitContext context) {
+            this.metricGroup = context.metricGroup();
+            metricGroup.setCurrentSendTimeGauge(() -> sendTime);
+        }
+
+        @Override
+        public void write(Long element, Context context) {
+            super.write(element, context);
+            sendTime = element * BASE_SEND_TIME;
+            metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
+            if (element % 2 == 0) {
+                metricGroup.getNumRecordsOutErrorsCounter().inc();
+            }
+            
metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES);
+        }
+    }
+
+    private static class MetricCommitter extends TestSinkV2.DefaultCommitter {
+        private int counter = 0;
+        private SharedReference<CountDownLatch> beforeLatch;
+        private SharedReference<CountDownLatch> afterLatch;
+
+        MetricCommitter(
+                SharedReference<CountDownLatch> beforeLatch,
+                SharedReference<CountDownLatch> afterLatch) {
+            this.beforeLatch = beforeLatch;
+            this.afterLatch = afterLatch;
+            this.counter = 0;
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<String>> committables) {
+            if (counter == 0) {
+                System.err.println(
+                        "Committables arrived "
+                                + Thread.currentThread().getName()
+                                + " "
+                                + committables.stream()
+                                        .map(c -> c.getCommittable())
+                                        .collect(Collectors.toList()));
+                committables.forEach(c -> c.retryLater());
+            } else {
+                if (counter == 1) {
+                    // Wait for metrics check before continue
+                    try {
+                        committables.forEach(any -> 
beforeLatch.get().countDown());
+                        afterLatch.get().await();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                }
+
+                committables.forEach(
+                        c -> {
+                            switch (c.getCommittable().charAt(1)) {
+                                case '0':
+                                    c.signalAlreadyCommitted();
+                                    // 1 already committed
+                                    break;
+                                case '1':
+                                case '2':
+                                    // 2 failed
+                                    c.signalFailedWithKnownReason(new 
RuntimeException());
+                                    break;
+                                case '3':
+                                    // Retry without change
+                                    if (counter == 1) {
+                                        c.retryLater();
+                                    }
+                                    break;
+                                case '4':
+                                case '5':
+                                    // Retry with change
+                                    c.updateAndRetryLater("Retry-" + 
c.getCommittable());
+                            }
+                        });
+            }
+            counter++;
+        }
+    }
+}

Reply via email to