This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f0b28bf47a3 Revert "[FLINK-33973] Add new interfaces for SinkV2 to
synchronize the API with the SourceV2 API"
f0b28bf47a3 is described below
commit f0b28bf47a31a7403d90da8d43cfbec2bc6c4869
Author: Gyula Fora <[email protected]>
AuthorDate: Fri Jan 12 11:36:29 2024 +0100
Revert "[FLINK-33973] Add new interfaces for SinkV2 to synchronize the API
with the SourceV2 API"
This reverts commit 6afe98daf6190a77a67a1fa8ac8f12337a75f8e7.
---
.../api/connector/sink2/CommitterInitContext.java | 29 -------
.../api/connector/sink2/CommittingSinkWriter.java | 39 ---------
.../org/apache/flink/api/connector/sink2/Sink.java | 99 +---------------------
.../flink/api/connector/sink2/StatefulSink.java | 52 +++++++-----
.../api/connector/sink2/StatefulSinkWriter.java | 39 ---------
.../api/connector/sink2/SupportsCommitter.java | 55 ------------
.../api/connector/sink2/SupportsWriterState.java | 73 ----------------
.../connector/sink2/TwoPhaseCommittingSink.java | 43 ++++++++--
.../api/connector/sink2/WriterInitContext.java | 85 -------------------
.../api/connector/sink2/CommittableSummary.java | 10 ---
.../connector/sink2/CommittableWithLineage.java | 5 --
.../sink2/SupportsPostCommitTopology.java | 47 ----------
.../connector/sink2/SupportsPreCommitTopology.java | 49 -----------
.../connector/sink2/SupportsPreWriteTopology.java | 37 --------
.../connector/sink2/WithPostCommitTopology.java | 23 +++--
.../api/connector/sink2/WithPreCommitTopology.java | 23 ++---
.../api/connector/sink2/WithPreWriteTopology.java | 21 +++--
.../api/transformations/SinkV1Adapter.java | 1 -
.../runtime/operators/sink/CommitterOperator.java | 2 +-
.../runtime/operators/sink/SinkWriterOperator.java | 7 +-
.../operators/sink/SinkWriterStateHandler.java | 6 +-
.../sink/StatefulSinkWriterStateHandler.java | 29 ++-----
.../sink/StatelessSinkWriterStateHandler.java | 4 +-
.../api/graph/StreamingJobGraphGeneratorTest.java | 1 -
.../flink/streaming/util/TestExpandingSink.java | 1 -
.../scheduling/SpeculativeSchedulerITCase.java | 1 -
26 files changed, 128 insertions(+), 653 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
deleted file mode 100644
index d44865a6491..00000000000
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommitterInitContext.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
-
-/** The interface exposes some runtime info for creating a {@link Committer}.
*/
-@PublicEvolving
-public interface CommitterInitContext extends InitContext {
- /** @return The metric group this committer belongs to. */
- SinkCommitterMetricGroup metricGroup();
-}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
deleted file mode 100644
index 980bcb32ef1..00000000000
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/** A {@link SinkWriter} that performs the first part of a two-phase commit
protocol. */
-@PublicEvolving
-public interface CommittingSinkWriter<InputT, CommittableT> extends
SinkWriter<InputT> {
- /**
- * Prepares for a commit.
- *
- * <p>This method will be called after {@link #flush(boolean)} and before
{@link
- * StatefulSinkWriter#snapshotState(long)}.
- *
- * @return The data to commit as the second step of the two-phase commit
protocol.
- * @throws IOException if fail to prepare for a commit.
- */
- Collection<CommittableT> prepareCommit() throws IOException,
InterruptedException;
-}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
index f5522c497d1..bc769ddcd06 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -31,13 +30,12 @@ import org.apache.flink.util.UserCodeClassLoader;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
-import java.util.OptionalLong;
import java.util.function.Consumer;
/**
* Base interface for developing a sink. A basic {@link Sink} is a stateless
sink that can flush
* data on checkpoint to achieve at-least-once consistency. Sinks with
additional requirements
- * should implement {@link SupportsWriterState} or {@link SupportsCommitter}.
+ * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
*
* <p>The {@link Sink} needs to be serializable. All configuration should be
validated eagerly. The
* respective sink writers are transient and will only be created in the
subtasks on the
@@ -54,30 +52,11 @@ public interface Sink<InputT> extends Serializable {
* @param context the runtime context.
* @return A sink writer.
* @throws IOException for any failure during creation.
- * @deprecated Please implement {@link #createWriter(WriterInitContext)}.
For backward
- * compatibility reasons - to keep {@link Sink} a functional interface
- Flink did not
- * provide a default implementation. New {@link Sink} implementations
should implement this
- * method, but it will not be used, and it will be removed in 1.20.0
release. Do not use
- * {@link Override} annotation when implementing this method, to
prevent compilation errors
- * when migrating to 1.20.x release.
*/
- @Deprecated
SinkWriter<InputT> createWriter(InitContext context) throws IOException;
- /**
- * Creates a {@link SinkWriter}.
- *
- * @param context the runtime context.
- * @return A sink writer.
- * @throws IOException for any failure during creation.
- */
- default SinkWriter<InputT> createWriter(WriterInitContext context) throws
IOException {
- return createWriter(new InitContextWrapper(context));
- }
-
/** The interface exposes some runtime info for creating a {@link
SinkWriter}. */
@PublicEvolving
- @Deprecated
interface InitContext extends
org.apache.flink.api.connector.sink2.InitContext {
/**
* Gets the {@link UserCodeClassLoader} to load classes that are not
in system's classpath,
@@ -131,80 +110,4 @@ public interface Sink<InputT> extends Serializable {
return Optional.empty();
}
}
-
- /**
- * Class for wrapping a new {@link WriterInitContext} to an old {@link
InitContext} until
- * deprecation.
- *
- * @deprecated Internal, do not use it.
- */
- @Deprecated
- class InitContextWrapper implements InitContext {
- private final WriterInitContext wrapped;
-
- InitContextWrapper(WriterInitContext wrapped) {
- this.wrapped = wrapped;
- }
-
- @Override
- public int getSubtaskId() {
- return wrapped.getSubtaskId();
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return wrapped.getNumberOfParallelSubtasks();
- }
-
- @Override
- public int getAttemptNumber() {
- return wrapped.getAttemptNumber();
- }
-
- @Override
- public OptionalLong getRestoredCheckpointId() {
- return wrapped.getRestoredCheckpointId();
- }
-
- @Override
- public JobID getJobId() {
- return wrapped.getJobId();
- }
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return wrapped.getUserCodeClassLoader();
- }
-
- @Override
- public MailboxExecutor getMailboxExecutor() {
- return wrapped.getMailboxExecutor();
- }
-
- @Override
- public ProcessingTimeService getProcessingTimeService() {
- return wrapped.getProcessingTimeService();
- }
-
- @Override
- public SinkWriterMetricGroup metricGroup() {
- return wrapped.metricGroup();
- }
-
- @Override
- public SerializationSchema.InitializationContext
- asSerializationSchemaInitializationContext() {
- return wrapped.asSerializationSchemaInitializationContext();
- }
-
- @Override
- public boolean isObjectReuseEnabled() {
- return wrapped.isObjectReuseEnabled();
- }
-
- @Override
- public <IN> TypeSerializer<IN> createInputSerializer() {
- return wrapped.createInputSerializer();
- }
- }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
index 5a3772b0d9e..a1814669fbc 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java
@@ -19,9 +19,11 @@
package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
/**
* A {@link Sink} with a stateful {@link SinkWriter}.
@@ -32,46 +34,51 @@ import java.util.Collection;
*
* @param <InputT> The type of the sink's input
* @param <WriterStateT> The type of the sink writer's state
- * @deprecated Please implement {@link Sink} and {@link SupportsWriterState}
instead.
*/
@PublicEvolving
-@Deprecated
-public interface StatefulSink<InputT, WriterStateT>
- extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {
+public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {
/**
- * Create a {@link
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
- * state.
+ * Create a {@link StatefulSinkWriter}.
*
* @param context the runtime context.
* @return A sink writer.
* @throws IOException for any failure during creation.
*/
- default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
- Sink.InitContext context, Collection<WriterStateT> recoveredState)
throws IOException {
- throw new UnsupportedOperationException(
- "Deprecated, please use restoreWriter(WriterInitContext,
Collection<WriterStateT>)");
- }
+ StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context)
throws IOException;
/**
- * Create a {@link
org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
- * state.
+ * Create a {@link StatefulSinkWriter} from a recovered state.
*
* @param context the runtime context.
* @return A sink writer.
* @throws IOException for any failure during creation.
*/
- default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
- WriterInitContext context, Collection<WriterStateT>
recoveredState) throws IOException {
- return restoreWriter(new InitContextWrapper(context), recoveredState);
- }
+ StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
+ InitContext context, Collection<WriterStateT> recoveredState)
throws IOException;
+
+ /**
+ * Any stateful sink needs to provide this state serializer and implement
{@link
+ * StatefulSinkWriter#snapshotState(long)} properly. The respective state
is used in {@link
+ * #restoreWriter(InitContext, Collection)} on recovery.
+ *
+ * @return the serializer of the writer's state type.
+ */
+ SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
/**
* A mix-in for {@link StatefulSink} that allows users to migrate from a
sink with a compatible
* state to this sink.
*/
@PublicEvolving
- interface WithCompatibleState extends
SupportsWriterState.WithCompatibleState {}
+ interface WithCompatibleState {
+ /**
+ * A list of state names of sinks from which the state can be
restored. For example, the new
+ * {@code FileSink} can resume from the state of an old {@code
StreamingFileSink} as a
+ * drop-in replacement when resuming from a checkpoint/savepoint.
+ */
+ Collection<String> getCompatibleWriterStateNames();
+ }
/**
* A {@link SinkWriter} whose state needs to be checkpointed.
@@ -80,6 +87,11 @@ public interface StatefulSink<InputT, WriterStateT>
* @param <WriterStateT> The type of the writer's state
*/
@PublicEvolving
- interface StatefulSinkWriter<InputT, WriterStateT>
- extends
org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {}
+ interface StatefulSinkWriter<InputT, WriterStateT> extends
SinkWriter<InputT> {
+ /**
+ * @return The writer's state.
+ * @throws IOException if fail to snapshot writer's state.
+ */
+ List<WriterStateT> snapshotState(long checkpointId) throws IOException;
+ }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
deleted file mode 100644
index 2f0d82045e6..00000000000
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A {@link SinkWriter} whose state needs to be checkpointed.
- *
- * @param <InputT> The type of the sink writer's input
- * @param <WriterStateT> The type of the writer's state
- */
-@PublicEvolving
-public interface StatefulSinkWriter<InputT, WriterStateT> extends
SinkWriter<InputT> {
- /**
- * @return The writer's state.
- * @throws IOException if fail to snapshot writer's state.
- */
- List<WriterStateT> snapshotState(long checkpointId) throws IOException;
-}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
deleted file mode 100644
index 12a714e8382..00000000000
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * A mixin interface for a {@link Sink} which supports exactly-once semantics
using a two-phase
- * commit protocol. The {@link Sink} consists of a {@link
CommittingSinkWriter} that performs the
- * precommits and a {@link Committer} that actually commits the data. To
facilitate the separation
- * the {@link CommittingSinkWriter} creates <i>committables</i> on checkpoint
or end of input and
- * the sends it to the {@link Committer}.
- *
- * <p>The {@link Sink} needs to be serializable. All configuration should be
validated eagerly. The
- * respective sink writers and committers are transient and will only be
created in the subtasks on
- * the taskmanagers.
- *
- * @param <CommittableT> The type of the committables.
- */
-@PublicEvolving
-public interface SupportsCommitter<CommittableT> {
-
- /**
- * Creates a {@link Committer} that permanently makes the previously
written data visible
- * through {@link Committer#commit(Collection)}.
- *
- * @param context The context information for the committer initialization.
- * @return A committer for the two-phase commit protocol.
- * @throws IOException for any failure during creation.
- */
- Committer<CommittableT> createCommitter(CommitterInitContext context)
throws IOException;
-
- /** Returns the serializer of the committable type. */
- SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
-}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
deleted file mode 100644
index 5ee49bcdaa1..00000000000
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * A mixin interface for a {@link Sink} which supports a stateful {@link
StatefulSinkWriter}.
- *
- * <p>The {@link Sink} needs to be serializable. All configuration should be
validated eagerly. The
- * respective sink writers are transient and will only be created in the
subtasks on the
- * taskmanagers.
- *
- * @param <InputT> The type of the sink's input
- * @param <WriterStateT> The type of the sink writer's state
- */
-@PublicEvolving
-public interface SupportsWriterState<InputT, WriterStateT> {
-
- /**
- * Create a {@link StatefulSinkWriter} from a recovered state.
- *
- * @param context the runtime context.
- * @param recoveredState the state to recover from.
- * @return A sink writer.
- * @throws IOException for any failure during creation.
- */
- StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
- WriterInitContext context, Collection<WriterStateT>
recoveredState) throws IOException;
-
- /**
- * Any stateful sink needs to provide this state serializer and implement
{@link
- * StatefulSinkWriter#snapshotState(long)} properly. The respective state
is used in {@link
- * #restoreWriter(WriterInitContext, Collection)} on recovery.
- *
- * @return the serializer of the writer's state type.
- */
- SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();
-
- /**
- * A mix-in for {@link SupportsWriterState} that allows users to migrate
from a sink with a
- * compatible state to this sink.
- */
- @PublicEvolving
- interface WithCompatibleState {
- /**
- * A collection of state names of sinks from which the state can be
restored. For example,
- * the new {@code FileSink} can resume from the state of an old {@code
StreamingFileSink} as
- * a drop-in replacement when resuming from a checkpoint/savepoint.
- */
- Collection<String> getCompatibleWriterStateNames();
- }
-}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
index d5f10ec3320..b2cf15565fb 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java
@@ -19,6 +19,9 @@
package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import java.io.IOException;
import java.util.Collection;
@@ -35,12 +38,19 @@ import java.util.Collection;
*
* @param <InputT> The type of the sink's input
* @param <CommT> The type of the committables.
- * @deprecated Please implement {@link Sink} {@link SupportsCommitter} instead.
*/
@PublicEvolving
-@Deprecated
-public interface TwoPhaseCommittingSink<InputT, CommT>
- extends Sink<InputT>, SupportsCommitter<CommT> {
+public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
+
+ /**
+ * Creates a {@link PrecommittingSinkWriter} that creates committables on
checkpoint or end of
+ * input.
+ *
+ * @param context the runtime context.
+ * @return A sink writer for the two-phase commit protocol.
+ * @throws IOException for any failure during creation.
+ */
+ PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context)
throws IOException;
/**
* Creates a {@link Committer} that permanently makes the previously
written data visible
@@ -68,8 +78,29 @@ public interface TwoPhaseCommittingSink<InputT, CommT>
return createCommitter();
}
+ /** Returns the serializer of the committable type. */
+ SimpleVersionedSerializer<CommT> getCommittableSerializer();
+
/** A {@link SinkWriter} that performs the first part of a two-phase
commit protocol. */
@PublicEvolving
- @Deprecated
- interface PrecommittingSinkWriter<InputT, CommT> extends
CommittingSinkWriter<InputT, CommT> {}
+ interface PrecommittingSinkWriter<InputT, CommT> extends
SinkWriter<InputT> {
+ /**
+ * Prepares for a commit.
+ *
+ * <p>This method will be called after {@link #flush(boolean)} and
before {@link
+ * StatefulSinkWriter#snapshotState(long)}.
+ *
+ * @return The data to commit as the second step of the two-phase
commit protocol.
+ * @throws IOException if fail to prepare for a commit.
+ */
+ Collection<CommT> prepareCommit() throws IOException,
InterruptedException;
+ }
+
+ /** The interface exposes some runtime info for creating a {@link
Committer}. */
+ @PublicEvolving
+ interface CommitterInitContext extends
org.apache.flink.api.connector.sink2.InitContext {
+
+ /** @return The metric group this committer belongs to. */
+ SinkCommitterMetricGroup metricGroup();
+ }
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
deleted file mode 100644
index 38e2e38c318..00000000000
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/WriterInitContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import java.util.Optional;
-import java.util.function.Consumer;
-
-/** The interface exposes some runtime info for creating a {@link SinkWriter}.
*/
-@PublicEvolving
-public interface WriterInitContext extends
org.apache.flink.api.connector.sink2.InitContext {
- /**
- * Gets the {@link UserCodeClassLoader} to load classes that are not in
system's classpath, but
- * are part of the jar file of a user job.
- *
- * @see UserCodeClassLoader
- */
- UserCodeClassLoader getUserCodeClassLoader();
-
- /**
- * Returns the mailbox executor that allows to execute {@link Runnable}s
inside the task thread
- * in between record processing.
- *
- * <p>Note that this method should not be used per-record for performance
reasons in the same
- * way as records should not be sent to the external system individually.
Rather, implementers
- * are expected to batch records and only enqueue a single {@link
Runnable} per batch to handle
- * the result.
- */
- MailboxExecutor getMailboxExecutor();
-
- /**
- * Returns a {@link ProcessingTimeService} that can be used to get the
current time and register
- * timers.
- */
- ProcessingTimeService getProcessingTimeService();
-
- /** @return The metric group this writer belongs to. */
- SinkWriterMetricGroup metricGroup();
-
- /** Provides a view on this context as a {@link
SerializationSchema.InitializationContext}. */
- SerializationSchema.InitializationContext
asSerializationSchemaInitializationContext();
-
- /** Returns whether object reuse has been enabled or disabled. */
- boolean isObjectReuseEnabled();
-
- /** Creates a serializer for the type of sink's input. */
- <IN> TypeSerializer<IN> createInputSerializer();
-
- /**
- * Returns a metadata consumer, the {@link SinkWriter} can publish
metadata events of type
- * {@link MetaT} to the consumer.
- *
- * <p>It is recommended to use a separate thread pool to publish the
metadata because enqueuing
- * a lot of these messages in the mailbox may lead to a performance
decrease. thread, and the
- * {@link Consumer#accept} method is executed very fast.
- */
- @Experimental
- default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
- return Optional.empty();
- }
-}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
index 7171a5168a7..baaa8714fff 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
@@ -84,14 +84,4 @@ public class CommittableSummary<CommT> implements
CommittableMessage<CommT> {
public int getNumberOfFailedCommittables() {
return numberOfFailedCommittables;
}
-
- public <NewCommT> CommittableSummary<NewCommT> map() {
- return new CommittableSummary<>(
- subtaskId,
- numberOfSubtasks,
- checkpointId,
- numberOfCommittables,
- numberOfPendingCommittables,
- numberOfFailedCommittables);
- }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index a792a3ad48c..1dddcc79256 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -25,7 +25,6 @@ import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import javax.annotation.Nullable;
import java.util.OptionalLong;
-import java.util.function.Function;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,8 +55,4 @@ public class CommittableWithLineage<CommT> implements
CommittableMessage<CommT>
public OptionalLong getCheckpointId() {
return checkpointId == null ? OptionalLong.empty() :
OptionalLong.of(checkpointId);
}
-
- public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT,
NewCommT> mapper) {
- return new CommittableWithLineage<>(mapper.apply(committable),
checkpointId, subtaskId);
- }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
deleted file mode 100644
index dc89423a824..00000000000
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPostCommitTopology.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Allows expert users to implement a custom topology after {@link Committer}.
- *
- * <p>It is recommended to use immutable committables because mutating
committables can have
- * unexpected side-effects.
- */
-@Experimental
-public interface SupportsPostCommitTopology<CommittableT> {
-
- /**
- * Adds a custom post-commit topology where all committables can be
processed.
- *
- * <p>It is strongly recommended to keep this pipeline stateless such that
batch and streaming
- * modes do not require special cases.
- *
- * <p>All operations need to be idempotent: on recovery, any number of
committables may be
- * replayed that have already been committed. It's mandatory that these
committables have no
- * effect on the external system.
- *
- * @param committables the stream of committables.
- */
- void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>>
committables);
-}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
deleted file mode 100644
index 67f277b1b45..00000000000
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Allows expert users to implement a custom topology after {@link SinkWriter}
and before {@link
- * Committer}.
- *
- * <p>It is recommended to use immutable committables because mutating
committables can have
- * unexpected side-effects.
- */
-@Experimental
-public interface SupportsPreCommitTopology<WriterResultT, CommittableT> {
-
- /**
- * Intercepts and modifies the committables sent on checkpoint or at end
of input. Implementers
- * need to ensure to modify all {@link CommittableMessage}s appropriately.
- *
- * @param committables the stream of committables.
- * @return the custom topology before {@link Committer}.
- */
- DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
- DataStream<CommittableMessage<WriterResultT>> committables);
-
- /** Returns the serializer of the WriteResult type. */
- SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
-}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
deleted file mode 100644
index 3e84b1ef4b9..00000000000
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/SupportsPreWriteTopology.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.connector.sink2;
-
-import org.apache.flink.annotation.Experimental;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/** Allows expert users to implement a custom topology before {@link
SinkWriter}. */
-@Experimental
-public interface SupportsPreWriteTopology<InputT> {
-
- /**
- * Adds an arbitrary topology before the writer. The topology may be used
to repartition the
- * data.
- *
- * @param inputDataStream the stream of input records.
- * @return the custom topology before {@link SinkWriter}.
- */
- DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
-}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
index 8fb516be550..17d1c685841 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.java
@@ -21,18 +21,29 @@ package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.streaming.api.datastream.DataStream;
/**
* Allows expert users to implement a custom topology after {@link Committer}.
*
* <p>It is recommended to use immutable committables because mutating
committables can have
* unexpected side-effects.
- *
- * @deprecated Please implement {@link
org.apache.flink.api.connector.sink2.Sink}, {@link
- * org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link
- * SupportsPostCommitTopology} instead.
*/
@Experimental
-@Deprecated
public interface WithPostCommitTopology<InputT, CommT>
- extends TwoPhaseCommittingSink<InputT, CommT>,
SupportsPostCommitTopology<CommT> {}
+ extends TwoPhaseCommittingSink<InputT, CommT> {
+
+ /**
+ * Adds a custom post-commit topology where all committables can be
processed.
+ *
+ * <p>It is strongly recommended to keep this pipeline stateless such that
batch and streaming
+ * modes do not require special cases.
+ *
+ * <p>All operations need to be idempotent: on recovery, any number of
committables may be
+ * replayed that have already been committed. It's mandatory that these
committables have no
+ * effect on the external system.
+ *
+ * @param committables the stream of committables.
+ */
+ void addPostCommitTopology(DataStream<CommittableMessage<CommT>>
committables);
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
index 88f4a007e79..6d7219d7d9d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreCommitTopology.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
/**
* Allows expert users to implement a custom topology after {@link SinkWriter}
and before {@link
@@ -30,17 +30,18 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
*
* <p>It is recommended to use immutable committables because mutating
committables can have
* unexpected side-effects.
- *
- * @deprecated Please implement {@link
org.apache.flink.api.connector.sink2.Sink}, {@link
- * org.apache.flink.api.connector.sink2.SupportsCommitter} and {@link
SupportsPreCommitTopology}
- * instead.
*/
@Experimental
-@Deprecated
public interface WithPreCommitTopology<InputT, CommT>
- extends TwoPhaseCommittingSink<InputT, CommT>,
SupportsPreCommitTopology<CommT, CommT> {
- /** Defaults to {@link #getCommittableSerializer} for backward
compatibility. */
- default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
- return getCommittableSerializer();
- }
+ extends TwoPhaseCommittingSink<InputT, CommT> {
+
+ /**
+ * Intercepts and modifies the committables sent on checkpoint or at end
of input. Implementers
+ * need to ensure to modify all {@link CommittableMessage}s appropriately.
+ *
+ * @param committables the stream of committables.
+ * @return the custom topology before {@link Committer}.
+ */
+ DataStream<CommittableMessage<CommT>> addPreCommitTopology(
+ DataStream<CommittableMessage<CommT>> committables);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
index dccd892cced..6a16b420439 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/WithPreWriteTopology.java
@@ -21,13 +21,18 @@ package org.apache.flink.streaming.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.streaming.api.datastream.DataStream;
-/**
- * Allows expert users to implement a custom topology before {@link
SinkWriter}.
- *
- * @deprecated Please implement {@link Sink} and {@link
SupportsPreWriteTopology} instead.
- */
+/** Allows expert users to implement a custom topology before {@link
SinkWriter}. */
@Experimental
-@Deprecated
-public interface WithPreWriteTopology<InputT>
- extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}
+public interface WithPreWriteTopology<InputT> extends Sink<InputT> {
+
+ /**
+ * Adds an arbitrary topology before the writer. The topology may be used
to repartition the
+ * data.
+ *
+ * @param inputDataStream the stream of input records.
+ * @return the custom topology before {@link SinkWriter}.
+ */
+ DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
index 478898ff19b..66104b0c6b4 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SinkV1Adapter.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink.ProcessingTimeService;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 028d8317d80..cb5044be232 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
@@ -50,6 +49,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.OptionalLong;
+import static
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.CommitterInitContext;
import static org.apache.flink.util.IOUtils.closeAll;
import static org.apache.flink.util.Preconditions.checkNotNull;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index c0a9892d5ee..499e868f8c4 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
@@ -133,7 +132,7 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- WriterInitContext initContext =
createInitContext(context.getRestoredCheckpointId());
+ InitContext initContext =
createInitContext(context.getRestoredCheckpointId());
if (context.isRestored()) {
if (committableSerializer != null) {
final ListState<List<CommT>> legacyCommitterState =
@@ -240,7 +239,7 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
}
}
- private WriterInitContext createInitContext(OptionalLong
restoredCheckpointId) {
+ private InitContext createInitContext(OptionalLong restoredCheckpointId) {
return new InitContextImpl(
getRuntimeContext(),
processingTimeService,
@@ -269,7 +268,7 @@ class SinkWriterOperator<InputT, CommT> extends
AbstractStreamOperator<Committab
}
}
- private static class InitContextImpl extends InitContextBase implements
WriterInitContext {
+ private static class InitContextImpl extends InitContextBase implements
InitContext {
private final ProcessingTimeService processingTimeService;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
index 6babf8582fd..0caf7bdb64b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
@@ -17,8 +17,8 @@
package org.apache.flink.streaming.runtime.operators.sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.state.StateInitializationContext;
/**
@@ -38,6 +38,6 @@ interface SinkWriterStateHandler<InputT> {
void snapshotState(long checkpointId) throws Exception;
/** Creates a writer, potentially using state from {@link
StateInitializationContext}. */
- SinkWriter<InputT> createWriter(
- WriterInitContext initContext, StateInitializationContext context)
throws Exception;
+ SinkWriter<InputT> createWriter(InitContext initContext,
StateInitializationContext context)
+ throws Exception;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
index 5847425fbca..b2b5a5d5108 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterStateHandler.java
@@ -22,18 +22,15 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.SupportsWriterState;
-import
org.apache.flink.api.connector.sink2.SupportsWriterState.WithCompatibleState;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
@@ -62,7 +59,7 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
*/
private final Collection<String> previousSinkStateNames;
- private final Sink<InputT> sink;
+ private final StatefulSink<InputT, WriterStateT> sink;
// ------------------------------- runtime fields
---------------------------------------
@@ -91,7 +88,7 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
@Override
public SinkWriter<InputT> createWriter(
- WriterInitContext initContext, StateInitializationContext context)
throws Exception {
+ InitContext initContext, StateInitializationContext context)
throws Exception {
final ListState<byte[]> rawState =
context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
writerState =
@@ -115,14 +112,9 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
previousSinkStates.add(previousSinkState);
Iterables.addAll(states, previousSinkState.get());
}
-
- if (!(sink instanceof SupportsWriterState)) {
- throw new IllegalArgumentException("Sink should implement
SupportsWriterState");
- }
-
- sinkWriter = ((SupportsWriterState)
sink).restoreWriter(initContext, states);
+ sinkWriter = sink.restoreWriter(initContext, states);
} else {
- sinkWriter = cast(sink.createWriter(initContext));
+ sinkWriter = sink.createWriter(initContext);
}
return sinkWriter;
}
@@ -132,11 +124,4 @@ final class StatefulSinkWriterStateHandler<InputT,
WriterStateT>
writerState.update(sinkWriter.snapshotState(checkpointId));
previousSinkStates.forEach(ListState::clear);
}
-
- private static StatefulSinkWriter cast(SinkWriter writer) {
- Preconditions.checkArgument(
- writer instanceof StatefulSinkWriter,
- "The writer should implement StatefulSinkWriter");
- return (StatefulSinkWriter) writer;
- }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
index f8f79da727d..f49da996224 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessSinkWriterStateHandler.java
@@ -18,8 +18,8 @@
package org.apache.flink.streaming.runtime.operators.sink;
import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -35,7 +35,7 @@ final class StatelessSinkWriterStateHandler<InputT>
implements SinkWriterStateHa
@Override
public SinkWriter<InputT> createWriter(
- WriterInitContext initContext, StateInitializationContext context)
throws Exception {
+ InitContext initContext, StateInitializationContext context)
throws Exception {
return sink.createWriter(initContext);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index fec52f6fa11..34d3b2dfeb5 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
index af00546d09a..5f26d045960 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestExpandingSink.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
index 44a86e81b94..c6f9613c1c5 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
import org.apache.flink.api.connector.source.Boundedness;