This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b28bf47a3 Revert "[FLINK-33973] Add new interfaces for SinkV2 to 
synchronize the API with the SourceV2 API"
f0b28bf47a3 is described below

commit f0b28bf47a31a7403d90da8d43cfbec2bc6c4869
Author: Gyula Fora <[email protected]>
AuthorDate: Fri Jan 12 11:36:29 2024 +0100

    Revert "[FLINK-33973] Add new interfaces for SinkV2 to synchronize the API 
with the SourceV2 API"
    
    This reverts commit 6afe98daf6190a77a67a1fa8ac8f12337a75f8e7.
---
 .../api/connector/sink2/CommitterInitContext.java  | 29 -------
 .../api/connector/sink2/CommittingSinkWriter.java  | 39 ---------
 .../org/apache/flink/api/connector/sink2/Sink.java | 99 +---------------------
 .../flink/api/connector/sink2/StatefulSink.java    | 52 +++++++-----
 .../api/connector/sink2/StatefulSinkWriter.java    | 39 ---------
 .../api/connector/sink2/SupportsCommitter.java     | 55 ------------
 .../api/connector/sink2/SupportsWriterState.java   | 73 ----------------
 .../connector/sink2/TwoPhaseCommittingSink.java    | 43 ++++++++--
 .../api/connector/sink2/WriterInitContext.java     | 85 -------------------
 .../api/connector/sink2/CommittableSummary.java    | 10 ---
 .../connector/sink2/CommittableWithLineage.java    |  5 --
 .../sink2/SupportsPostCommitTopology.java          | 47 ----------
 .../connector/sink2/SupportsPreCommitTopology.java | 49 -----------
 .../connector/sink2/SupportsPreWriteTopology.java  | 37 --------
 .../connector/sink2/WithPostCommitTopology.java    | 23 +++--
 .../api/connector/sink2/WithPreCommitTopology.java | 23 ++---
 .../api/connector/sink2/WithPreWriteTopology.java  | 21 +++--
 .../api/transformations/SinkV1Adapter.java         |  1 -
 .../runtime/operators/sink/CommitterOperator.java  |  2 +-
 .../runtime/operators/sink/SinkWriterOperator.java |  7 +-
 .../operators/sink/SinkWriterStateHandler.java     |  6 +-
 .../sink/StatefulSinkWriterStateHandler.java       | 29 ++-----
 .../sink/StatelessSinkWriterStateHandler.java      |  4 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  1 -
 .../flink/streaming/util/TestExpandingSink.java    |  1 -
 .../scheduling/SpeculativeSchedulerITCase.java     |  1 -
 26 files changed, 128 insertions(+), 653 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
deleted file mode 100644
index d44865a6491..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
-
-/** The interface exposes some runtime info for creating a {@link Committer}. 
*/
-@PublicEvolving
-public interface CommitterInitContext extends InitContext {
-    /** @return The metric group this committer belongs to. */
-    SinkCommitterMetricGroup metricGroup();
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
deleted file mode 100644
index 980bcb32ef1..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/** A {@link SinkWriter} that performs the first part of a two-phase commit 
protocol. */
-@PublicEvolving
-public interface CommittingSinkWriter<InputT, CommittableT> extends 
SinkWriter<InputT> {
-    /**
-     * Prepares for a commit.
-     *
-     * <p>This method will be called after {@link #flush(boolean)} and before 
{@link
-     * StatefulSinkWriter#snapshotState(long)}.
-     *
-     * @return The data to commit as the second step of the two-phase commit 
protocol.
-     * @throws IOException if fail to prepare for a commit.
-     */
-    Collection<CommittableT> prepareCommit() throws IOException, 
InterruptedException;
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
index f5522c497d1..bc769ddcd06 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.connector.sink2;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -31,13 +30,12 @@ import org.apache.flink.util.UserCodeClassLoader;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Optional;
-import java.util.OptionalLong;
 import java.util.function.Consumer;
 
 /**
  * Base interface for developing a sink. A basic {@link Sink} is a stateless 
sink that can flush
  * data on checkpoint to achieve at-least-once consistency. Sinks with 
additional requirements
- * should implement {@link SupportsWriterState} or {@link SupportsCommitter}.
+ * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
  *
  * <p>The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
  * respective sink writers are transient and will only be created in the 
subtasks on the
@@ -54,30 +52,11 @@ public interface Sink<InputT> extends Serializable {
      * @param context the runtime context.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
-     * @deprecated Please implement {@link #createWriter(WriterInitContext)}. 
For backward
-     *     compatibility reasons - to keep {@link Sink} a functional interface 
- Flink did not
-     *     provide a default implementation. New {@link Sink} implementations 
should implement this
-     *     method, but it will not be used, and it will be removed in 1.20.0 
release. Do not use
-     *     {@link Override} annotation when implementing this method, to 
prevent compilation errors
-     *     when migrating to 1.20.x release.
      */
-    @Deprecated
     SinkWriter<InputT> createWriter(InitContext context) throws IOException;
 
-    /**
-     * Creates a {@link SinkWriter}.
-     *
-     * @param context the runtime context.
-     * @return A sink writer.
-     * @throws IOException for any failure during creation.
-     */
-    default SinkWriter<InputT> createWriter(WriterInitContext context) throws 
IOException {
-        return createWriter(new InitContextWrapper(context));
-    }
-
     /** The interface exposes some runtime info for creating a {@link 
SinkWriter}. */
     @PublicEvolving
-    @Deprecated
     interface InitContext extends 
org.apache.flink.api.connector.sink2.InitContext {
         /**
          * Gets the {@link UserCodeClassLoader} to load classes that are not 
in system's classpath,
@@ -131,80 +110,4 @@ public interface Sink<InputT> extends Serializable {
             return Optional.empty();
         }
     }
-
-    /**
-     * Class for wrapping a new {@link WriterInitContext} to an old {@link 
InitContext} until
-     * deprecation.
-     *
-     * @deprecated Internal, do not use it.
-     */
-    @Deprecated
-    class InitContextWrapper implements InitContext {
-        private final WriterInitContext wrapped;
-
-        InitContextWrapper(WriterInitContext wrapped) {
-            this.wrapped = wrapped;
-        }
-
-        @Override
-        public int getSubtaskId() {
-            return wrapped.getSubtaskId();
-        }
-
-        @Override
-        public int getNumberOfParallelSubtasks() {
-            return wrapped.getNumberOfParallelSubtasks();
-        }
-
-        @Override
-        public int getAttemptNumber() {
-            return wrapped.getAttemptNumber();
-        }
-
-        @Override
-        public OptionalLong getRestoredCheckpointId() {
-            return wrapped.getRestoredCheckpointId();
-        }
-
-        @Override
-        public JobID getJobId() {
-            return wrapped.getJobId();
-        }
-
-        @Override
-        public UserCodeClassLoader getUserCodeClassLoader() {
-            return wrapped.getUserCodeClassLoader();
-        }
-
-        @Override
-        public MailboxExecutor getMailboxExecutor() {
-            return wrapped.getMailboxExecutor();
-        }
-
-        @Override
-        public ProcessingTimeService getProcessingTimeService() {
-            return wrapped.getProcessingTimeService();
-        }
-
-        @Override
-        public SinkWriterMetricGroup metricGroup() {
-            return wrapped.metricGroup();
-        }
-
-        @Override
-        public SerializationSchema.InitializationContext
-                asSerializationSchemaInitializationContext() {
-            return wrapped.asSerializationSchemaInitializationContext();
-        }
-
-        @Override
-        public boolean isObjectReuseEnabled() {
-            return wrapped.isObjectReuseEnabled();
-        }
-
-        @Override
-        public <IN> TypeSerializer<IN> createInputSerializer() {
-            return wrapped.createInputSerializer();
-        }
-    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
index 5a3772b0d9e..a1814669fbc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
@@ -19,9 +19,11 @@
 package org.apache.flink.api.connector.sink2;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 
 /**
  * A {@link Sink} with a stateful {@link SinkWriter}.
@@ -32,46 +34,51 @@ import java.util.Collection;
  *
  * @param <InputT> The type of the sink's input
  * @param <WriterStateT> The type of the sink writer's state
- * @deprecated Please implement {@link Sink} and {@link SupportsWriterState} 
instead.
  */
 @PublicEvolving
-@Deprecated
-public interface StatefulSink<InputT, WriterStateT>
-        extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {
+public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
 
     /**
-     * Create a {@link 
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
-     * state.
+     * Create a {@link StatefulSinkWriter}.
      *
      * @param context the runtime context.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
      */
-    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
-            Sink.InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException {
-        throw new UnsupportedOperationException(
-                "Deprecated, please use restoreWriter(WriterInitContext, 
Collection<WriterStateT>)");
-    }
+    StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) 
throws IOException;
 
     /**
-     * Create a {@link 
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
-     * state.
+     * Create a {@link StatefulSinkWriter} from a recovered state.
      *
      * @param context the runtime context.
      * @return A sink writer.
      * @throws IOException for any failure during creation.
      */
-    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
-            WriterInitContext context, Collection<WriterStateT> 
recoveredState) throws IOException {
-        return restoreWriter(new InitContextWrapper(context), recoveredState);
-    }
+    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+            InitContext context, Collection<WriterStateT> recoveredState) 
throws IOException;
+
+    /**
+     * Any stateful sink needs to provide this state serializer and implement 
{@link
+     * StatefulSinkWriter#snapshotState(long)} properly. The respective state 
is used in {@link
+     * #restoreWriter(InitContext, Collection)} on recovery.
+     *
+     * @return the serializer of the writer's state type.
+     */
+    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
 
     /**
      * A mix-in for {@link StatefulSink} that allows users to migrate from a 
sink with a compatible
      * state to this sink.
      */
     @PublicEvolving
-    interface WithCompatibleState extends 
SupportsWriterState.WithCompatibleState {}
+    interface WithCompatibleState {
+        /**
+         * A list of state names of sinks from which the state can be 
restored. For example, the new
+         * {@code FileSink} can resume from the state of an old {@code 
StreamingFileSink} as a
+         * drop-in replacement when resuming from a checkpoint/savepoint.
+         */
+        Collection<String> getCompatibleWriterStateNames();
+    }
 
     /**
      * A {@link SinkWriter} whose state needs to be checkpointed.
@@ -80,6 +87,11 @@ public interface StatefulSink<InputT, WriterStateT>
      * @param <WriterStateT> The type of the writer's state
      */
     @PublicEvolving
-    interface StatefulSinkWriter<InputT, WriterStateT>
-            extends 
org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {}
+    interface StatefulSinkWriter<InputT, WriterStateT> extends 
SinkWriter<InputT> {
+        /**
+         * @return The writer's state.
+         * @throws IOException if fail to snapshot writer's state.
+         */
+        List<WriterStateT> snapshotState(long checkpointId) throws IOException;
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
deleted file mode 100644
index 2f0d82045e6..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A {@link SinkWriter} whose state needs to be checkpointed.
- *
- * @param <InputT> The type of the sink writer's input
- * @param <WriterStateT> The type of the writer's state
- */
-@PublicEvolving
-public interface StatefulSinkWriter<InputT, WriterStateT> extends 
SinkWriter<InputT> {
-    /**
-     * @return The writer's state.
-     * @throws IOException if fail to snapshot writer's state.
-     */
-    List<WriterStateT> snapshotState(long checkpointId) throws IOException;
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
deleted file mode 100644
index 12a714e8382..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * A mixin interface for a {@link Sink} which supports exactly-once semantics 
using a two-phase
- * commit protocol. The {@link Sink} consists of a {@link 
CommittingSinkWriter} that performs the
- * precommits and a {@link Committer} that actually commits the data. To 
facilitate the separation
- * the {@link CommittingSinkWriter} creates <i>committables</i> on checkpoint 
or end of input and
- * the sends it to the {@link Committer}.
- *
- * <p>The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
- * respective sink writers and committers are transient and will only be 
created in the subtasks on
- * the taskmanagers.
- *
- * @param <CommittableT> The type of the committables.
- */
-@PublicEvolving
-public interface SupportsCommitter<CommittableT> {
-
-    /**
-     * Creates a {@link Committer} that permanently makes the previously 
written data visible
-     * through {@link Committer#commit(Collection)}.
-     *
-     * @param context The context information for the committer initialization.
-     * @return A committer for the two-phase commit protocol.
-     * @throws IOException for any failure during creation.
-     */
-    Committer<CommittableT> createCommitter(CommitterInitContext context) 
throws IOException;
-
-    /** Returns the serializer of the committable type. */
-    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
deleted file mode 100644
index 5ee49bcdaa1..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * A mixin interface for a {@link Sink} which supports a stateful {@link 
StatefulSinkWriter}.
- *
- * <p>The {@link Sink} needs to be serializable. All configuration should be 
validated eagerly. The
- * respective sink writers are transient and will only be created in the 
subtasks on the
- * taskmanagers.
- *
- * @param <InputT> The type of the sink's input
- * @param <WriterStateT> The type of the sink writer's state
- */
-@PublicEvolving
-public interface SupportsWriterState<InputT, WriterStateT> {
-
-    /**
-     * Create a {@link StatefulSinkWriter} from a recovered state.
-     *
-     * @param context the runtime context.
-     * @param recoveredState the state to recover from.
-     * @return A sink writer.
-     * @throws IOException for any failure during creation.
-     */
-    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
-            WriterInitContext context, Collection<WriterStateT> 
recoveredState) throws IOException;
-
-    /**
-     * Any stateful sink needs to provide this state serializer and implement 
{@link
-     * StatefulSinkWriter#snapshotState(long)} properly. The respective state 
is used in {@link
-     * #restoreWriter(WriterInitContext, Collection)} on recovery.
-     *
-     * @return the serializer of the writer's state type.
-     */
-    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
-
-    /**
-     * A mix-in for {@link SupportsWriterState} that allows users to migrate 
from a sink with a
-     * compatible state to this sink.
-     */
-    @PublicEvolving
-    interface WithCompatibleState {
-        /**
-         * A collection of state names of sinks from which the state can be 
restored. For example,
-         * the new {@code FileSink} can resume from the state of an old {@code 
StreamingFileSink} as
-         * a drop-in replacement when resuming from a checkpoint/savepoint.
-         */
-        Collection<String> getCompatibleWriterStateNames();
-    }
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
index d5f10ec3320..b2cf15565fb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
@@ -19,6 +19,9 @@
 package org.apache.flink.api.connector.sink2;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -35,12 +38,19 @@ import java.util.Collection;
  *
  * @param <InputT> The type of the sink's input
  * @param <CommT> The type of the committables.
- * @deprecated Please implement {@link Sink} {@link SupportsCommitter} instead.
  */
 @PublicEvolving
-@Deprecated
-public interface TwoPhaseCommittingSink<InputT, CommT>
-        extends Sink<InputT>, SupportsCommitter<CommT> {
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+    /**
+     * Creates a {@link PrecommittingSinkWriter} that creates committables on 
checkpoint or end of
+     * input.
+     *
+     * @param context the runtime context.
+     * @return A sink writer for the two-phase commit protocol.
+     * @throws IOException for any failure during creation.
+     */
+    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) 
throws IOException;
 
     /**
      * Creates a {@link Committer} that permanently makes the previously 
written data visible
@@ -68,8 +78,29 @@ public interface TwoPhaseCommittingSink<InputT, CommT>
         return createCommitter();
     }
 
+    /** Returns the serializer of the committable type. */
+    SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
     /** A {@link SinkWriter} that performs the first part of a two-phase 
commit protocol. */
     @PublicEvolving
-    @Deprecated
-    interface PrecommittingSinkWriter<InputT, CommT> extends 
CommittingSinkWriter<InputT, CommT> {}
+    interface PrecommittingSinkWriter<InputT, CommT> extends 
SinkWriter<InputT> {
+        /**
+         * Prepares for a commit.
+         *
+         * <p>This method will be called after {@link #flush(boolean)} and 
before {@link
+         * StatefulSinkWriter#snapshotState(long)}.
+         *
+         * @return The data to commit as the second step of the two-phase 
commit protocol.
+         * @throws IOException if fail to prepare for a commit.
+         */
+        Collection<CommT> prepareCommit() throws IOException, 
InterruptedException;
+    }
+
+    /** The interface exposes some runtime info for creating a {@link 
Committer}. */
+    @PublicEvolving
+    interface CommitterInitContext extends 
org.apache.flink.api.connector.sink2.InitContext {
+
+        /** @return The metric group this committer belongs to. */
+        SinkCommitterMetricGroup metricGroup();
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
deleted file mode 100644
index 38e2e38c318..00000000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import java.util.Optional;
-import java.util.function.Consumer;
-
-/** The interface exposes some runtime info for creating a {@link SinkWriter}. 
*/
-@PublicEvolving
-public interface WriterInitContext extends 
org.apache.flink.api.connector.sink2.InitContext {
-    /**
-     * Gets the {@link UserCodeClassLoader} to load classes that are not in 
system's classpath, but
-     * are part of the jar file of a user job.
-     *
-     * @see UserCodeClassLoader
-     */
-    UserCodeClassLoader getUserCodeClassLoader();
-
-    /**
-     * Returns the mailbox executor that allows to execute {@link Runnable}s 
inside the task thread
-     * in between record processing.
-     *
-     * <p>Note that this method should not be used per-record for performance 
reasons in the same
-     * way as records should not be sent to the external system individually. 
Rather, implementers
-     * are expected to batch records and only enqueue a single {@link 
Runnable} per batch to handle
-     * the result.
-     */
-    MailboxExecutor getMailboxExecutor();
-
-    /**
-     * Returns a {@link ProcessingTimeService} that can be used to get the 
current time and register
-     * timers.
-     */
-    ProcessingTimeService getProcessingTimeService();
-
-    /** @return The metric group this writer belongs to. */
-    SinkWriterMetricGroup metricGroup();
-
-    /** Provides a view on this context as a {@link 
SerializationSchema.InitializationContext}. */
-    SerializationSchema.InitializationContext 
asSerializationSchemaInitializationContext();
-
-    /** Returns whether object reuse has been enabled or disabled. */
-    boolean isObjectReuseEnabled();
-
-    /** Creates a serializer for the type of sink's input. */
-    <IN> TypeSerializer<IN> createInputSerializer();
-
-    /**
-     * Returns a metadata consumer, the {@link SinkWriter} can publish 
metadata events of type
-     * {@link MetaT} to the consumer.
-     *
-     * <p>It is recommended to use a separate thread pool to publish the 
metadata because enqueuing
-     * a lot of these messages in the mailbox may lead to a performance 
decrease. thread, and the
-     * {@link Consumer#accept} method is executed very fast.
-     */
-    @Experimental
-    default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
-        return Optional.empty();
-    }
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
index 7171a5168a7..baaa8714fff 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
@@ -84,14 +84,4 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
     public int getNumberOfFailedCommittables() {
         return numberOfFailedCommittables;
     }
-
-    public <NewCommT> CommittableSummary<NewCommT> map() {
-        return new CommittableSummary<>(
-                subtaskId,
-                numberOfSubtasks,
-                checkpointId,
-                numberOfCommittables,
-                numberOfPendingCommittables,
-                numberOfFailedCommittables);
-    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index a792a3ad48c..1dddcc79256 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import javax.annotation.Nullable;
 
 import java.util.OptionalLong;
-import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -56,8 +55,4 @@ public class CommittableWithLineage<CommT> implements 
CommittableMessage<CommT>
     public OptionalLong getCheckpointId() {
         return checkpointId == null ? OptionalLong.empty() : 
OptionalLong.of(checkpointId);
     }
-
-    public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT, 
NewCommT> mapper) {
-        return new CommittableWithLineage<>(mapper.apply(committable), 
checkpointId, subtaskId);
-    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
deleted file mode 100644
index dc89423a824..00000000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Allows expert users to implement a custom topology after {@link Committer}.
- *
- * <p>It is recommended to use immutable committables because mutating 
committables can have
- * unexpected side-effects.
- */
-@Experimental
-public interface SupportsPostCommitTopology<CommittableT> {
-
-    /**
-     * Adds a custom post-commit topology where all committables can be 
processed.
-     *
-     * <p>It is strongly recommended to keep this pipeline stateless such that 
batch and streaming
-     * modes do not require special cases.
-     *
-     * <p>All operations need to be idempotent: on recovery, any number of 
committables may be
-     * replayed that have already been committed. It's mandatory that these 
committables have no
-     * effect on the external system.
-     *
-     * @param committables the stream of committables.
-     */
-    void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> 
committables);
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
deleted file mode 100644
index 67f277b1b45..00000000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Allows expert users to implement a custom topology after {@link SinkWriter} 
and before {@link
- * Committer}.
- *
- * <p>It is recommended to use immutable committables because mutating 
committables can have
- * unexpected side-effects.
- */
-@Experimental
-public interface SupportsPreCommitTopology<WriterResultT, CommittableT> {
-
-    /**
-     * Intercepts and modifies the committables sent on checkpoint or at end 
of input. Implementers
-     * need to ensure to modify all {@link CommittableMessage}s appropriately.
-     *
-     * @param committables the stream of committables.
-     * @return the custom topology before {@link Committer}.
-     */
-    DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
-            DataStream<CommittableMessage<WriterResultT>> committables);
-
-    /** Returns the serializer of the WriteResult type. */
-    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
deleted file mode 100644
index 3e84b1ef4b9..00000000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
-@Experimental
-public interface SupportsPreWriteTopology<InputT> {
-
-    /**
-     * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
-     * data.
-     *
-     * @param inputDataStream the stream of input records.
-     * @return the custom topology before {@link SinkWriter}.
-     */
-    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
index 8fb516be550..17d1c685841 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
@@ -21,18 +21,29 @@ package org.apache.flink.streaming.api.connector.sink2;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
  * Allows expert users to implement a custom topology after {@link Committer}.
  *
  * <p>It is recommended to use immutable committables because mutating 
committables can have
  * unexpected side-effects.
- *
- * @deprecated Please implement {@link 
org.apache.flink.api.connector.sink2.Sink}, {@link
- *     org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link
- *     SupportsPostCommitTopology} instead.
  */
 @Experimental
-@Deprecated
 public interface WithPostCommitTopology<InputT, CommT>
-        extends TwoPhaseCommittingSink<InputT, CommT>, 
SupportsPostCommitTopology<CommT> {}
+        extends TwoPhaseCommittingSink<InputT, CommT> {
+
+    /**
+     * Adds a custom post-commit topology where all committables can be 
processed.
+     *
+     * <p>It is strongly recommended to keep this pipeline stateless such that 
batch and streaming
+     * modes do not require special cases.
+     *
+     * <p>All operations need to be idempotent: on recovery, any number of 
committables may be
+     * replayed that have already been committed. It's mandatory that these 
committables have no
+     * effect on the external system.
+     *
+     * @param committables the stream of committables.
+     */
+    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> 
committables);
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
index 88f4a007e79..6d7219d7d9d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
  * Allows expert users to implement a custom topology after {@link SinkWriter} 
and before {@link
@@ -30,17 +30,18 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
  *
  * <p>It is recommended to use immutable committables because mutating 
committables can have
  * unexpected side-effects.
- *
- * @deprecated Please implement {@link 
org.apache.flink.api.connector.sink2.Sink}, {@link
- *     org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link 
SupportsPreCommitTopology}
- *     instead.
  */
 @Experimental
-@Deprecated
 public interface WithPreCommitTopology<InputT, CommT>
-        extends TwoPhaseCommittingSink<InputT, CommT>, 
SupportsPreCommitTopology<CommT, CommT> {
-    /** Defaults to {@link #getCommittableSerializer} for backward 
compatibility. */
-    default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
-        return getCommittableSerializer();
-    }
+        extends TwoPhaseCommittingSink<InputT, CommT> {
+
+    /**
+     * Intercepts and modifies the committables sent on checkpoint or at end 
of input. Implementers
+     * need to ensure to modify all {@link CommittableMessage}s appropriately.
+     *
+     * @param committables the stream of committables.
+     * @return the custom topology before {@link Committer}.
+     */
+    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
+            DataStream<CommittableMessage<CommT>> committables);
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
index dccd892cced..6a16b420439 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
@@ -21,13 +21,18 @@ package org.apache.flink.streaming.api.connector.sink2;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.streaming.api.datastream.DataStream;
 
-/**
- * Allows expert users to implement a custom topology before {@link 
SinkWriter}.
- *
- * @deprecated Please implement {@link Sink} and {@link 
SupportsPreWriteTopology} instead.
- */
+/** Allows expert users to implement a custom topology before {@link 
SinkWriter}. */
 @Experimental
-@Deprecated
-public interface WithPreWriteTopology<InputT>
-        extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}
+public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+
+    /**
+     * Adds an arbitrary topology before the writer. The topology may be used 
to repartition the
+     * data.
+     *
+     * @param inputDataStream the stream of input records.
+     * @return the custom topology before {@link SinkWriter}.
+     */
+    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
index 478898ff19b..66104b0c6b4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink.GlobalCommitter;
 import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 028d8317d80..cb5044be232 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import 
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
@@ -50,6 +49,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.OptionalLong;
 
+import static 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.CommitterInitContext;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index c0a9892d5ee..499e868f8c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
@@ -133,7 +132,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        WriterInitContext initContext = 
createInitContext(context.getRestoredCheckpointId());
+        InitContext initContext = 
createInitContext(context.getRestoredCheckpointId());
         if (context.isRestored()) {
             if (committableSerializer != null) {
                 final ListState<List<CommT>> legacyCommitterState =
@@ -240,7 +239,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         }
     }
 
-    private WriterInitContext createInitContext(OptionalLong 
restoredCheckpointId) {
+    private InitContext createInitContext(OptionalLong restoredCheckpointId) {
         return new InitContextImpl(
                 getRuntimeContext(),
                 processingTimeService,
@@ -269,7 +268,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         }
     }
 
-    private static class InitContextImpl extends InitContextBase implements 
WriterInitContext {
+    private static class InitContextImpl extends InitContextBase implements 
InitContext {
 
         private final ProcessingTimeService processingTimeService;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
index 6babf8582fd..0caf7bdb64b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.runtime.operators.sink;
 
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 
 /**
@@ -38,6 +38,6 @@ interface SinkWriterStateHandler<InputT> {
     void snapshotState(long checkpointId) throws Exception;
 
     /** Creates a writer, potentially using state from {@link 
StateInitializationContext}. */
-    SinkWriter<InputT> createWriter(
-            WriterInitContext initContext, StateInitializationContext context) 
throws Exception;
+    SinkWriter<InputT> createWriter(InitContext initContext, 
StateInitializationContext context)
+            throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
index 5847425fbca..b2b5a5d5108 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
@@ -22,18 +22,15 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.SupportsWriterState;
-import 
org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
 
@@ -62,7 +59,7 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
      */
     private final Collection<String> previousSinkStateNames;
 
-    private final Sink<InputT> sink;
+    private final StatefulSink<InputT, WriterStateT> sink;
 
     // ------------------------------- runtime fields 
---------------------------------------
 
@@ -91,7 +88,7 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
 
     @Override
     public SinkWriter<InputT> createWriter(
-            WriterInitContext initContext, StateInitializationContext context) 
throws Exception {
+            InitContext initContext, StateInitializationContext context) 
throws Exception {
         final ListState<byte[]> rawState =
                 
context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
         writerState =
@@ -115,14 +112,9 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
                 previousSinkStates.add(previousSinkState);
                 Iterables.addAll(states, previousSinkState.get());
             }
-
-            if (!(sink instanceof SupportsWriterState)) {
-                throw new IllegalArgumentException("Sink should implement 
SupportsWriterState");
-            }
-
-            sinkWriter = ((SupportsWriterState) 
sink).restoreWriter(initContext, states);
+            sinkWriter = sink.restoreWriter(initContext, states);
         } else {
-            sinkWriter = cast(sink.createWriter(initContext));
+            sinkWriter = sink.createWriter(initContext);
         }
         return sinkWriter;
     }
@@ -132,11 +124,4 @@ final class StatefulSinkWriterStateHandler<InputT, 
WriterStateT>
         writerState.update(sinkWriter.snapshotState(checkpointId));
         previousSinkStates.forEach(ListState::clear);
     }
-
-    private static StatefulSinkWriter cast(SinkWriter writer) {
-        Preconditions.checkArgument(
-                writer instanceof StatefulSinkWriter,
-                "The writer should implement StatefulSinkWriter");
-        return (StatefulSinkWriter) writer;
-    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
index f8f79da727d..f49da996224 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
@@ -18,8 +18,8 @@
 package org.apache.flink.streaming.runtime.operators.sink;
 
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.runtime.state.StateInitializationContext;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -35,7 +35,7 @@ final class StatelessSinkWriterStateHandler<InputT> 
implements SinkWriterStateHa
 
     @Override
     public SinkWriter<InputT> createWriter(
-            WriterInitContext initContext, StateInitializationContext context) 
throws Exception {
+            InitContext initContext, StateInitializationContext context) 
throws Exception {
         return sink.createWriter(initContext);
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index fec52f6fa11..34d3b2dfeb5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
index af00546d09a..5f26d045960 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
index 44a86e81b94..c6f9613c1c5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
 import org.apache.flink.api.connector.source.Boundedness;

Reply via email to