This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6afe98daf61 [FLINK-33973] Add new interfaces for SinkV2 to synchronize
the API with the SourceV2 API
6afe98daf61 is described below
commit 6afe98daf6190a77a67a1fa8ac8f12337a75f8e7
Author: pvary <[email protected]>
AuthorDate: Fri Jan 12 10:16:00 2024 +0100
[FLINK-33973] Add new interfaces for SinkV2 to synchronize the API with the
SourceV2 API
---
.../api/connector/sink2/CommitterInitContext.java | 25 ++----
.../api/connector/sink2/CommittingSinkWriter.java | 27 +++---
.../org/apache/flink/api/connector/sink2/Sink.java | 99 +++++++++++++++++++++-
.../flink/api/connector/sink2/StatefulSink.java | 52 +++++-------
.../api/connector/sink2/StatefulSinkWriter.java | 29 ++++---
.../api/connector/sink2/SupportsCommitter.java | 55 ++++++++++++
...{StatefulSink.java => SupportsWriterState.java} | 50 +++--------
.../connector/sink2/TwoPhaseCommittingSink.java | 43 ++--------
.../api/connector/sink2/WriterInitContext.java | 85 +++++++++++++++++++
.../api/connector/sink2/CommittableSummary.java | 10 +++
.../connector/sink2/CommittableWithLineage.java | 5 ++
...pology.java => SupportsPostCommitTopology.java} | 6 +-
...opology.java => SupportsPreCommitTopology.java} | 12 +--
...Topology.java => SupportsPreWriteTopology.java} | 3 +-
.../connector/sink2/WithPostCommitTopology.java | 23 ++---
.../api/connector/sink2/WithPreCommitTopology.java | 23 +++--
.../api/connector/sink2/WithPreWriteTopology.java | 21 ++---
.../api/transformations/SinkV1Adapter.java | 1 +
.../runtime/operators/sink/CommitterOperator.java | 2 +-
.../runtime/operators/sink/SinkWriterOperator.java | 7 +-
.../operators/sink/SinkWriterStateHandler.java | 6 +-
.../sink/StatefulSinkWriterStateHandler.java | 29 +++++--
.../sink/StatelessSinkWriterStateHandler.java | 4 +-
.../api/graph/StreamingJobGraphGeneratorTest.java | 1 +
.../flink/streaming/util/TestExpandingSink.java | 1 +
.../scheduling/SpeculativeSchedulerITCase.java | 1 +
26 files changed, 400 insertions(+), 220 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
similarity index 51%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to
flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
index 6a16b420439..d44865a6491 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
@@ -16,23 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.connector.sink2;
+package org.apache.flink.api.connector.sink2;
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
-/** Allows expert users to implement a custom topology before {@link
SinkWriter}. */
-@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
-
- /**
- * Adds an arbitrary topology before the writer. The topology may be used
to repartition the
- * data.
- *
- * @param inputDataStream the stream of input records.
- * @return the custom topology before {@link SinkWriter}.
- */
- DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+/** The interface exposes some runtime info for creating a {@link Committer}.
*/
+@PublicEvolving
+public interface CommitterInitContext extends InitContext {
+ /** @return The metric group this committer belongs to. */
+ SinkCommitterMetricGroup metricGroup();
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
similarity index 52%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to
flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
index 6a16b420439..980bcb32ef1 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
@@ -16,23 +16,24 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.connector.sink2;
+package org.apache.flink.api.connector.sink2;
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.annotation.PublicEvolving;
-/** Allows expert users to implement a custom topology before {@link
SinkWriter}. */
-@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+import java.io.IOException;
+import java.util.Collection;
+/** A {@link SinkWriter} that performs the first part of a two-phase commit
protocol. */
+@PublicEvolving
+public interface CommittingSinkWriter<InputT, CommittableT> extends
SinkWriter<InputT> {
/**
- * Adds an arbitrary topology before the writer. The topology may be used
to repartition the
- * data.
+ * Prepares for a commit.
*
- * @param inputDataStream the stream of input records.
- * @return the custom topology before {@link SinkWriter}.
+ * <p>This method will be called after {@link #flush(boolean)} and before
{@link
+ * StatefulSinkWriter#snapshotState(long)}.
+ *
+ * @return The data to commit as the second step of the two-phase commit
protocol.
+ * @throws IOException if fail to prepare for a commit.
*/
- DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+ Collection<CommittableT> prepareCommit() throws IOException,
InterruptedException;
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
index bc769ddcd06..f5522c497d1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -30,12 +31,13 @@ import org.apache.flink.util.UserCodeClassLoader;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.function.Consumer;
/**
* Base interface for developing a sink. A basic {@link Sink} is a stateless
sink that can flush
* data on checkpoint to achieve at-least-once consistency. Sinks with
additional requirements
- * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
+ * should implement {@link SupportsWriterState} or {@link SupportsCommitter}.
*
* <p>The {@link Sink} needs to be serializable. All configuration should be
validated eagerly. The
* respective sink writers are transient and will only be created in the
subtasks on the
@@ -52,11 +54,30 @@ public interface Sink<InputT> extends Serializable {
* @param context the runtime context.
* @return A sink writer.
* @throws IOException for any failure during creation.
+ * @deprecated Please implement {@link #createWriter(WriterInitContext)}.
For backward
+ * compatibility reasons - to keep {@link Sink} a functional interface
- Flink did not
+ * provide a default implementation. New {@link Sink} implementations
should implement this
+ * method, but it will not be used, and it will be removed in 1.20.0
release. Do not use
+ * {@link Override} annotation when implementing this method, to
prevent compilation errors
+ * when migrating to 1.20.x release.
*/
+ @Deprecated
SinkWriter<InputT> createWriter(InitContext context) throws IOException;
+ /**
+ * Creates a {@link SinkWriter}.
+ *
+ * @param context the runtime context.
+ * @return A sink writer.
+ * @throws IOException for any failure during creation.
+ */
+ default SinkWriter<InputT> createWriter(WriterInitContext context) throws
IOException {
+ return createWriter(new InitContextWrapper(context));
+ }
+
/** The interface exposes some runtime info for creating a {@link
SinkWriter}. */
@PublicEvolving
+ @Deprecated
interface InitContext extends
org.apache.flink.api.connector.sink2.InitContext {
/**
* Gets the {@link UserCodeClassLoader} to load classes that are not
in system's classpath,
@@ -110,4 +131,80 @@ public interface Sink<InputT> extends Serializable {
return Optional.empty();
}
}
+
+ /**
+ * Class for wrapping a new {@link WriterInitContext} to an old {@link
InitContext} until
+ * deprecation.
+ *
+ * @deprecated Internal, do not use it.
+ */
+ @Deprecated
+ class InitContextWrapper implements InitContext {
+ private final WriterInitContext wrapped;
+
+ InitContextWrapper(WriterInitContext wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public int getSubtaskId() {
+ return wrapped.getSubtaskId();
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return wrapped.getNumberOfParallelSubtasks();
+ }
+
+ @Override
+ public int getAttemptNumber() {
+ return wrapped.getAttemptNumber();
+ }
+
+ @Override
+ public OptionalLong getRestoredCheckpointId() {
+ return wrapped.getRestoredCheckpointId();
+ }
+
+ @Override
+ public JobID getJobId() {
+ return wrapped.getJobId();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return wrapped.getUserCodeClassLoader();
+ }
+
+ @Override
+ public MailboxExecutor getMailboxExecutor() {
+ return wrapped.getMailboxExecutor();
+ }
+
+ @Override
+ public ProcessingTimeService getProcessingTimeService() {
+ return wrapped.getProcessingTimeService();
+ }
+
+ @Override
+ public SinkWriterMetricGroup metricGroup() {
+ return wrapped.metricGroup();
+ }
+
+ @Override
+ public SerializationSchema.InitializationContext
+ asSerializationSchemaInitializationContext() {
+ return wrapped.asSerializationSchemaInitializationContext();
+ }
+
+ @Override
+ public boolean isObjectReuseEnabled() {
+ return wrapped.isObjectReuseEnabled();
+ }
+
+ @Override
+ public <IN> TypeSerializer<IN> createInputSerializer() {
+ return wrapped.createInputSerializer();
+ }
+ }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
index a1814669fbc..5a3772b0d9e 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
@@ -19,11 +19,9 @@
package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
/**
* A {@link Sink} with a stateful {@link SinkWriter}.
@@ -34,51 +32,46 @@ import java.util.List;
*
* @param <InputT> The type of the sink's input
* @param <WriterStateT> The type of the sink writer's state
+ * @deprecated Please implement {@link Sink} and {@link SupportsWriterState}
instead.
*/
@PublicEvolving
-public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
+@Deprecated
+public interface StatefulSink<InputT, WriterStateT>
+ extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {
/**
- * Create a {@link StatefulSinkWriter}.
+ * Create a {@link
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
+ * state.
*
* @param context the runtime context.
* @return A sink writer.
* @throws IOException for any failure during creation.
*/
- StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context)
throws IOException;
+ default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+ Sink.InitContext context, Collection<WriterStateT> recoveredState)
throws IOException {
+ throw new UnsupportedOperationException(
+ "Deprecated, please use restoreWriter(WriterInitContext,
Collection<WriterStateT>)");
+ }
/**
- * Create a {@link StatefulSinkWriter} from a recovered state.
+ * Create a {@link
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
+ * state.
*
* @param context the runtime context.
* @return A sink writer.
* @throws IOException for any failure during creation.
*/
- StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
- InitContext context, Collection<WriterStateT> recoveredState)
throws IOException;
-
- /**
- * Any stateful sink needs to provide this state serializer and implement
{@link
- * StatefulSinkWriter#snapshotState(long)} properly. The respective state
is used in {@link
- * #restoreWriter(InitContext, Collection)} on recovery.
- *
- * @return the serializer of the writer's state type.
- */
- SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
+ default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+ WriterInitContext context, Collection<WriterStateT>
recoveredState) throws IOException {
+ return restoreWriter(new InitContextWrapper(context), recoveredState);
+ }
/**
* A mix-in for {@link StatefulSink} that allows users to migrate from a
sink with a compatible
* state to this sink.
*/
@PublicEvolving
- interface WithCompatibleState {
- /**
- * A list of state names of sinks from which the state can be
restored. For example, the new
- * {@code FileSink} can resume from the state of an old {@code
StreamingFileSink} as a
- * drop-in replacement when resuming from a checkpoint/savepoint.
- */
- Collection<String> getCompatibleWriterStateNames();
- }
+ interface WithCompatibleState extends
SupportsWriterState.WithCompatibleState {}
/**
* A {@link SinkWriter} whose state needs to be checkpointed.
@@ -87,11 +80,6 @@ public interface StatefulSink<InputT, WriterStateT> extends
Sink<InputT> {
* @param <WriterStateT> The type of the writer's state
*/
@PublicEvolving
- interface StatefulSinkWriter<InputT, WriterStateT> extends
SinkWriter<InputT> {
- /**
- * @return The writer's state.
- * @throws IOException if fail to snapshot writer's state.
- */
- List<WriterStateT> snapshotState(long checkpointId) throws IOException;
- }
+ interface StatefulSinkWriter<InputT, WriterStateT>
+ extends
org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
similarity index 52%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to
flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
index 6a16b420439..2f0d82045e6 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
@@ -16,23 +16,24 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.api.connector.sink2;
+package org.apache.flink.api.connector.sink2;
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.annotation.PublicEvolving;
-/** Allows expert users to implement a custom topology before {@link
SinkWriter}. */
-@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+import java.io.IOException;
+import java.util.List;
+/**
+ * A {@link SinkWriter} whose state needs to be checkpointed.
+ *
+ * @param <InputT> The type of the sink writer's input
+ * @param <WriterStateT> The type of the writer's state
+ */
+@PublicEvolving
+public interface StatefulSinkWriter<InputT, WriterStateT> extends
SinkWriter<InputT> {
/**
- * Adds an arbitrary topology before the writer. The topology may be used
to repartition the
- * data.
- *
- * @param inputDataStream the stream of input records.
- * @return the custom topology before {@link SinkWriter}.
+ * @return The writer's state.
+ * @throws IOException if fail to snapshot writer's state.
*/
- DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+ List<WriterStateT> snapshotState(long checkpointId) throws IOException;
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
new file mode 100644
index 00000000000..12a714e8382
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A mixin interface for a {@link Sink} which supports exactly-once semantics
using a two-phase
+ * commit protocol. The {@link Sink} consists of a {@link
CommittingSinkWriter} that performs the
+ * precommits and a {@link Committer} that actually commits the data. To
facilitate the separation
+ * the {@link CommittingSinkWriter} creates <i>committables</i> on checkpoint
or end of input and
+ * the sends it to the {@link Committer}.
+ *
+ * <p>The {@link Sink} needs to be serializable. All configuration should be
validated eagerly. The
+ * respective sink writers and committers are transient and will only be
created in the subtasks on
+ * the taskmanagers.
+ *
+ * @param <CommittableT> The type of the committables.
+ */
+@PublicEvolving
+public interface SupportsCommitter<CommittableT> {
+
+ /**
+ * Creates a {@link Committer} that permanently makes the previously
written data visible
+ * through {@link Committer#commit(Collection)}.
+ *
+ * @param context The context information for the committer initialization.
+ * @return A committer for the two-phase commit protocol.
+ * @throws IOException for any failure during creation.
+ */
+ Committer<CommittableT> createCommitter(CommitterInitContext context)
throws IOException;
+
+ /** Returns the serializer of the committable type. */
+ SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
similarity index 52%
copy from
flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
copy to
flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
index a1814669fbc..5ee49bcdaa1 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
@@ -23,75 +23,51 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.util.Collection;
-import java.util.List;
/**
- * A {@link Sink} with a stateful {@link SinkWriter}.
+ * A mixin interface for a {@link Sink} which supports a stateful {@link
StatefulSinkWriter}.
*
- * <p>The {@link StatefulSink} needs to be serializable. All configuration
should be validated
- * eagerly. The respective sink writers are transient and will only be created
in the subtasks on
- * the taskmanagers.
+ * <p>The {@link Sink} needs to be serializable. All configuration should be
validated eagerly. The
+ * respective sink writers are transient and will only be created in the
subtasks on the
+ * taskmanagers.
*
* @param <InputT> The type of the sink's input
* @param <WriterStateT> The type of the sink writer's state
*/
@PublicEvolving
-public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
-
- /**
- * Create a {@link StatefulSinkWriter}.
- *
- * @param context the runtime context.
- * @return A sink writer.
- * @throws IOException for any failure during creation.
- */
- StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context)
throws IOException;
+public interface SupportsWriterState<InputT, WriterStateT> {
/**
* Create a {@link StatefulSinkWriter} from a recovered state.
*
* @param context the runtime context.
+ * @param recoveredState the state to recover from.
* @return A sink writer.
* @throws IOException for any failure during creation.
*/
StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
- InitContext context, Collection<WriterStateT> recoveredState)
throws IOException;
+ WriterInitContext context, Collection<WriterStateT>
recoveredState) throws IOException;
/**
* Any stateful sink needs to provide this state serializer and implement
{@link
* StatefulSinkWriter#snapshotState(long)} properly. The respective state
is used in {@link
- * #restoreWriter(InitContext, Collection)} on recovery.
+ * #restoreWriter(WriterInitContext, Collection)} on recovery.
*
* @return the serializer of the writer's state type.
*/
SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
/**
- * A mix-in for {@link StatefulSink} that allows users to migrate from a
sink with a compatible
- * state to this sink.
+ * A mix-in for {@link SupportsWriterState} that allows users to migrate
from a sink with a
+ * compatible state to this sink.
*/
@PublicEvolving
interface WithCompatibleState {
/**
- * A list of state names of sinks from which the state can be
restored. For example, the new
- * {@code FileSink} can resume from the state of an old {@code
StreamingFileSink} as a
- * drop-in replacement when resuming from a checkpoint/savepoint.
+ * A collection of state names of sinks from which the state can be
restored. For example,
+ * the new {@code FileSink} can resume from the state of an old {@code
StreamingFileSink} as
+ * a drop-in replacement when resuming from a checkpoint/savepoint.
*/
Collection<String> getCompatibleWriterStateNames();
}
-
- /**
- * A {@link SinkWriter} whose state needs to be checkpointed.
- *
- * @param <InputT> The type of the sink writer's input
- * @param <WriterStateT> The type of the writer's state
- */
- @PublicEvolving
- interface StatefulSinkWriter<InputT, WriterStateT> extends
SinkWriter<InputT> {
- /**
- * @return The writer's state.
- * @throws IOException if fail to snapshot writer's state.
- */
- List<WriterStateT> snapshotState(long checkpointId) throws IOException;
- }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
index b2cf15565fb..d5f10ec3320 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
@@ -19,9 +19,6 @@
package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import java.io.IOException;
import java.util.Collection;
@@ -38,19 +35,12 @@ import java.util.Collection;
*
* @param <InputT> The type of the sink's input
* @param <CommT> The type of the committables.
+ * @deprecated Please implement {@link Sink} {@link SupportsCommitter} instead.
*/
@PublicEvolving
-public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
-
- /**
- * Creates a {@link PrecommittingSinkWriter} that creates committables on
checkpoint or end of
- * input.
- *
- * @param context the runtime context.
- * @return A sink writer for the two-phase commit protocol.
- * @throws IOException for any failure during creation.
- */
- PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context)
throws IOException;
+@Deprecated
+public interface TwoPhaseCommittingSink<InputT, CommT>
+ extends Sink<InputT>, SupportsCommitter<CommT> {
/**
* Creates a {@link Committer} that permanently makes the previously
written data visible
@@ -78,29 +68,8 @@ public interface TwoPhaseCommittingSink<InputT, CommT>
extends Sink<InputT> {
return createCommitter();
}
- /** Returns the serializer of the committable type. */
- SimpleVersionedSerializer<CommT> getCommittableSerializer();
-
/** A {@link SinkWriter} that performs the first part of a two-phase
commit protocol. */
@PublicEvolving
- interface PrecommittingSinkWriter<InputT, CommT> extends
SinkWriter<InputT> {
- /**
- * Prepares for a commit.
- *
- * <p>This method will be called after {@link #flush(boolean)} and
before {@link
- * StatefulSinkWriter#snapshotState(long)}.
- *
- * @return The data to commit as the second step of the two-phase
commit protocol.
- * @throws IOException if fail to prepare for a commit.
- */
- Collection<CommT> prepareCommit() throws IOException,
InterruptedException;
- }
-
- /** The interface exposes some runtime info for creating a {@link
Committer}. */
- @PublicEvolving
- interface CommitterInitContext extends
org.apache.flink.api.connector.sink2.InitContext {
-
- /** @return The metric group this committer belongs to. */
- SinkCommitterMetricGroup metricGroup();
- }
+ @Deprecated
+ interface PrecommittingSinkWriter<InputT, CommT> extends
CommittingSinkWriter<InputT, CommT> {}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
new file mode 100644
index 00000000000..38e2e38c318
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/** The interface exposes some runtime info for creating a {@link SinkWriter}.
*/
+@PublicEvolving
+public interface WriterInitContext extends
org.apache.flink.api.connector.sink2.InitContext {
+ /**
+ * Gets the {@link UserCodeClassLoader} to load classes that are not in
system's classpath, but
+ * are part of the jar file of a user job.
+ *
+ * @see UserCodeClassLoader
+ */
+ UserCodeClassLoader getUserCodeClassLoader();
+
+ /**
+ * Returns the mailbox executor that allows to execute {@link Runnable}s
inside the task thread
+ * in between record processing.
+ *
+ * <p>Note that this method should not be used per-record for performance
reasons in the same
+ * way as records should not be sent to the external system individually.
Rather, implementers
+ * are expected to batch records and only enqueue a single {@link
Runnable} per batch to handle
+ * the result.
+ */
+ MailboxExecutor getMailboxExecutor();
+
+ /**
+ * Returns a {@link ProcessingTimeService} that can be used to get the
current time and register
+ * timers.
+ */
+ ProcessingTimeService getProcessingTimeService();
+
+ /** @return The metric group this writer belongs to. */
+ SinkWriterMetricGroup metricGroup();
+
+ /** Provides a view on this context as a {@link
SerializationSchema.InitializationContext}. */
+ SerializationSchema.InitializationContext
asSerializationSchemaInitializationContext();
+
+ /** Returns whether object reuse has been enabled or disabled. */
+ boolean isObjectReuseEnabled();
+
+ /** Creates a serializer for the type of sink's input. */
+ <IN> TypeSerializer<IN> createInputSerializer();
+
+ /**
+ * Returns a metadata consumer, the {@link SinkWriter} can publish
metadata events of type
+ * {@link MetaT} to the consumer.
+ *
+ * <p>It is recommended to use a separate thread pool to publish the
metadata because enqueuing
+ * a lot of these messages in the mailbox may lead to a performance
decrease. thread, and the
+ * {@link Consumer#accept} method is executed very fast.
+ */
+ @Experimental
+ default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
+ return Optional.empty();
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
index baaa8714fff..7171a5168a7 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
@@ -84,4 +84,14 @@ public class CommittableSummary<CommT> implements
CommittableMessage<CommT> {
public int getNumberOfFailedCommittables() {
return numberOfFailedCommittables;
}
+
+ public <NewCommT> CommittableSummary<NewCommT> map() {
+ return new CommittableSummary<>(
+ subtaskId,
+ numberOfSubtasks,
+ checkpointId,
+ numberOfCommittables,
+ numberOfPendingCommittables,
+ numberOfFailedCommittables);
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index 1dddcc79256..a792a3ad48c 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -25,6 +25,7 @@ import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import javax.annotation.Nullable;
import java.util.OptionalLong;
+import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -55,4 +56,8 @@ public class CommittableWithLineage<CommT> implements
CommittableMessage<CommT>
public OptionalLong getCheckpointId() {
return checkpointId == null ? OptionalLong.empty() :
OptionalLong.of(checkpointId);
}
+
+ public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT,
NewCommT> mapper) {
+ return new CommittableWithLineage<>(mapper.apply(committable),
checkpointId, subtaskId);
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
similarity index 90%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
copy to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
index 17d1c685841..dc89423a824 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.streaming.api.datastream.DataStream;
/**
@@ -30,8 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
* unexpected side-effects.
*/
@Experimental
-public interface WithPostCommitTopology<InputT, CommT>
- extends TwoPhaseCommittingSink<InputT, CommT> {
+public interface SupportsPostCommitTopology<CommittableT> {
/**
* Adds a custom post-commit topology where all committables can be
processed.
@@ -45,5 +43,5 @@ public interface WithPostCommitTopology<InputT, CommT>
*
* @param committables the stream of committables.
*/
- void addPostCommitTopology(DataStream<CommittableMessage<CommT>>
committables);
+ void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>>
committables);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
similarity index 80%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
copy to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
index 6d7219d7d9d..67f277b1b45 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
/**
@@ -32,8 +32,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
* unexpected side-effects.
*/
@Experimental
-public interface WithPreCommitTopology<InputT, CommT>
- extends TwoPhaseCommittingSink<InputT, CommT> {
+public interface SupportsPreCommitTopology<WriterResultT, CommittableT> {
/**
* Intercepts and modifies the committables sent on checkpoint or at end
of input. Implementers
@@ -42,6 +41,9 @@ public interface WithPreCommitTopology<InputT, CommT>
* @param committables the stream of committables.
* @return the custom topology before {@link Committer}.
*/
- DataStream<CommittableMessage<CommT>> addPreCommitTopology(
- DataStream<CommittableMessage<CommT>> committables);
+ DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
+ DataStream<CommittableMessage<WriterResultT>> committables);
+
+ /** Returns the serializer of the WriteResult type. */
+ SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
similarity index 92%
copy from
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
copy to
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
index 6a16b420439..3e84b1ef4b9 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
@@ -19,13 +19,12 @@
package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.streaming.api.datastream.DataStream;
/** Allows expert users to implement a custom topology before {@link
SinkWriter}. */
@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+public interface SupportsPreWriteTopology<InputT> {
/**
* Adds an arbitrary topology before the writer. The topology may be used
to repartition the
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
index 17d1c685841..8fb516be550 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
@@ -21,29 +21,18 @@ package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.streaming.api.datastream.DataStream;
/**
* Allows expert users to implement a custom topology after {@link Committer}.
*
* <p>It is recommended to use immutable committables because mutating
committables can have
* unexpected side-effects.
+ *
+ * @deprecated Please implement {@link
org.apache.flink.api.connector.sink2.Sink}, {@link
+ * org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link
+ * SupportsPostCommitTopology} instead.
*/
@Experimental
+@Deprecated
public interface WithPostCommitTopology<InputT, CommT>
- extends TwoPhaseCommittingSink<InputT, CommT> {
-
- /**
- * Adds a custom post-commit topology where all committables can be
processed.
- *
- * <p>It is strongly recommended to keep this pipeline stateless such that
batch and streaming
- * modes do not require special cases.
- *
- * <p>All operations need to be idempotent: on recovery, any number of
committables may be
- * replayed that have already been committed. It's mandatory that these
committables have no
- * effect on the external system.
- *
- * @param committables the stream of committables.
- */
- void addPostCommitTopology(DataStream<CommittableMessage<CommT>>
committables);
-}
+ extends TwoPhaseCommittingSink<InputT, CommT>,
SupportsPostCommitTopology<CommT> {}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
index 6d7219d7d9d..88f4a007e79 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
/**
* Allows expert users to implement a custom topology after {@link SinkWriter}
and before {@link
@@ -30,18 +30,17 @@ import org.apache.flink.streaming.api.datastream.DataStream;
*
* <p>It is recommended to use immutable committables because mutating
committables can have
* unexpected side-effects.
+ *
+ * @deprecated Please implement {@link
org.apache.flink.api.connector.sink2.Sink}, {@link
+ * org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link
SupportsPreCommitTopology}
+ * instead.
*/
@Experimental
+@Deprecated
public interface WithPreCommitTopology<InputT, CommT>
- extends TwoPhaseCommittingSink<InputT, CommT> {
-
- /**
- * Intercepts and modifies the committables sent on checkpoint or at end
of input. Implementers
- * need to ensure to modify all {@link CommittableMessage}s appropriately.
- *
- * @param committables the stream of committables.
- * @return the custom topology before {@link Committer}.
- */
- DataStream<CommittableMessage<CommT>> addPreCommitTopology(
- DataStream<CommittableMessage<CommT>> committables);
+ extends TwoPhaseCommittingSink<InputT, CommT>,
SupportsPreCommitTopology<CommT, CommT> {
+ /** Defaults to {@link #getCommittableSerializer} for backward
compatibility. */
+ default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
+ return getCommittableSerializer();
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
index 6a16b420439..dccd892cced 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
@@ -21,18 +21,13 @@ package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
-/** Allows expert users to implement a custom topology before {@link
SinkWriter}. */
+/**
+ * Allows expert users to implement a custom topology before {@link
SinkWriter}.
+ *
+ * @deprecated Please implement {@link Sink} and {@link
SupportsPreWriteTopology} instead.
+ */
@Experimental
-public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
-
- /**
- * Adds an arbitrary topology before the writer. The topology may be used
to repartition the
- * data.
- *
- * @param inputDataStream the stream of input records.
- * @return the custom topology before {@link SinkWriter}.
- */
- DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
-}
+@Deprecated
+public interface WithPreWriteTopology<InputT>
+ extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
index 66104b0c6b4..478898ff19b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index cb5044be232..028d8317d80 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
@@ -49,7 +50,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.OptionalLong;
-import static
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.CommitterInitContext;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkNotNull;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 499e868f8c4..c0a9892d5ee 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
@@ -132,7 +133,7 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- InitContext initContext =
createInitContext(context.getRestoredCheckpointId());
+ WriterInitContext initContext =
createInitContext(context.getRestoredCheckpointId());
if (context.isRestored()) {
if (committableSerializer != null) {
final ListState<List<CommT>> legacyCommitterState =
@@ -239,7 +240,7 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
}
}
- private InitContext createInitContext(OptionalLong restoredCheckpointId) {
+ private WriterInitContext createInitContext(OptionalLong
restoredCheckpointId) {
return new InitContextImpl(
getRuntimeContext(),
processingTimeService,
@@ -268,7 +269,7 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
}
}
- private static class InitContextImpl extends InitContextBase implements
InitContext {
+ private static class InitContextImpl extends InitContextBase implements
WriterInitContext {
private final ProcessingTimeService processingTimeService;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
index 0caf7bdb64b..6babf8582fd 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
@@ -17,8 +17,8 @@
package org.apache.flink.streaming.runtime.operators.sink;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.state.StateInitializationContext;
/**
@@ -38,6 +38,6 @@ interface SinkWriterStateHandler<InputT> {
void snapshotState(long checkpointId) throws Exception;
/** Creates a writer, potentially using state from {@link
StateInitializationContext}. */
- SinkWriter<InputT> createWriter(InitContext initContext,
StateInitializationContext context)
- throws Exception;
+ SinkWriter<InputT> createWriter(
+ WriterInitContext initContext, StateInitializationContext context)
throws Exception;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
index b2b5a5d5108..5847425fbca 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
@@ -22,15 +22,18 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import
org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
@@ -59,7 +62,7 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
*/
private final Collection<String> previousSinkStateNames;
- private final StatefulSink<InputT, WriterStateT> sink;
+ private final Sink<InputT> sink;
// ------------------------------- runtime fields
---------------------------------------
@@ -88,7 +91,7 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
@Override
public SinkWriter<InputT> createWriter(
- InitContext initContext, StateInitializationContext context)
throws Exception {
+ WriterInitContext initContext, StateInitializationContext context)
throws Exception {
final ListState<byte[]> rawState =
context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
writerState =
@@ -112,9 +115,14 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
previousSinkStates.add(previousSinkState);
Iterables.addAll(states, previousSinkState.get());
}
- sinkWriter = sink.restoreWriter(initContext, states);
+
+ if (!(sink instanceof SupportsWriterState)) {
+ throw new IllegalArgumentException("Sink should implement
SupportsWriterState");
+ }
+
+ sinkWriter = ((SupportsWriterState)
sink).restoreWriter(initContext, states);
} else {
- sinkWriter = sink.createWriter(initContext);
+ sinkWriter = cast(sink.createWriter(initContext));
}
return sinkWriter;
}
@@ -124,4 +132,11 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
writerState.update(sinkWriter.snapshotState(checkpointId));
previousSinkStates.forEach(ListState::clear);
}
+
+ private static StatefulSinkWriter cast(SinkWriter writer) {
+ Preconditions.checkArgument(
+ writer instanceof StatefulSinkWriter,
+ "The writer should implement StatefulSinkWriter");
+ return (StatefulSinkWriter) writer;
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
index f49da996224..f8f79da727d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
@@ -18,8 +18,8 @@
package org.apache.flink.streaming.runtime.operators.sink;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -35,7 +35,7 @@ final class StatelessSinkWriterStateHandler<InputT>
implements SinkWriterStateHa
@Override
public SinkWriter<InputT> createWriter(
- InitContext initContext, StateInitializationContext context)
throws Exception {
+ WriterInitContext initContext, StateInitializationContext context)
throws Exception {
return sink.createWriter(initContext);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 34d3b2dfeb5..fec52f6fa11 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
index 5f26d045960..af00546d09a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
index c6f9613c1c5..44a86e81b94 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.api.connector.source.Boundedness;