This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06192741d02714499c6d0f671b21bfa2588a21bf Author: Arvid Heise <[email protected]> AuthorDate: Fri Dec 10 16:11:17 2021 +0100 [FLINK-25569][core] Add decomposed Sink V2 interface. The new interface separates concerns and will make future refactorings and extensions easier. The user immediately which methods needs to be implemented. --- .../flink/api/connector/sink2/Committer.java | 103 +++++++++++++++++++ .../org/apache/flink/api/connector/sink2/Sink.java | 109 +++++++++++++++++++++ .../flink/api/connector/sink2/SinkWriter.java | 72 ++++++++++++++ .../flink/api/connector/sink2/StatefulSink.java | 97 ++++++++++++++++++ .../connector/sink2/TwoPhaseCommittingSink.java | 80 +++++++++++++++ 5 files changed, 461 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java new file mode 100644 index 0000000..c51cce2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Committer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.IOException; +import java.util.Collection; + +/** + * The {@code Committer} is responsible for committing the data staged by the {@link + * TwoPhaseCommittingSink.PrecommittingSinkWriter} in the second step of a two-phase commit + * protocol. + * + * <p>A commit must be idempotent: If some failure occurs in Flink during commit phase, Flink will + * restart from previous checkpoint and re-attempt to commit all committables. Thus, some or all + * committables may have already been committed. These {@link CommitRequest}s must not change the + * external system and implementers are asked to signal {@link + * CommitRequest#signalAlreadyCommitted()}. + * + * @param <CommT> The type of information needed to commit the staged data + */ +@PublicEvolving +public interface Committer<CommT> extends AutoCloseable { + /** + * Commit the given list of {@link CommT}. + * + * @param committables A list of commit requests staged by the sink writer. + * @throws IOException for reasons that may yield a complete restart of the job. + */ + void commit(Collection<CommitRequest<CommT>> committables) + throws IOException, InterruptedException; + + /** + * A request to commit a specific committable. + * + * @param <CommT> + */ + @PublicEvolving + interface CommitRequest<CommT> { + + /** Returns the committable. */ + CommT getCommittable(); + + /** + * Returns how many times this particular committable has been retried. Starts at 0 for the + * first attempt. + */ + int getNumberOfRetries(); + + /** + * The commit failed for known reason and should not be retried. + * + * <p>Currently calling this method only logs the error, discards the comittable and + * continues. In the future the behaviour might be configurable. + */ + void signalFailedWithKnownReason(Throwable t); + + /** + * The commit failed for unknown reason and should not be retried. + * + * <p>Currently calling this method fails the job. In the future the behaviour might be + * configurable. + */ + void signalFailedWithUnknownReason(Throwable t); + + /** + * The commit failed for a retriable reason. If the sink supports a retry maximum, this may + * permanently fail after reaching that maximum. Else the committable will be retried as + * long as this method is invoked after each attempt. + */ + void retryLater(); + + /** + * Updates the underlying committable and retries later (see {@link #retryLater()} for a + * description). This method can be used if a committable partially succeeded. + */ + void updateAndRetryLater(CommT committable); + + /** + * Signals that a committable is skipped as it was committed already in a previous run. + * Using this method is optional but eases bookkeeping and debugging. It also serves as a + * code documentation for the branches dealing with recovery. + */ + void signalAlreadyCommitted(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java new file mode 100644 index 0000000..4052a6c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.UserCodeClassLoader; + +import java.io.IOException; +import java.io.Serializable; +import java.util.OptionalLong; + +/** + * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush + * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements + * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}. + * + * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The + * respective sink writers are transient and will only be created in the subtasks on the + * taskmanagers. + * + * @param <InputT> The type of the sink's input + */ +@PublicEvolving +public interface Sink<InputT> extends Serializable { + + /** + * Creates a {@link SinkWriter}. + * + * @param context the runtime context. + * @return A sink writer. + * @throws IOException for any failure during creation. + */ + SinkWriter<InputT> createWriter(InitContext context) throws IOException; + + /** The interface exposes some runtime info for creating a {@link SinkWriter}. */ + @PublicEvolving + interface InitContext { + /** + * The first checkpoint id when an application is started and not recovered from a + * previously taken checkpoint or savepoint. + */ + long INITIAL_CHECKPOINT_ID = 1; + + /** + * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, + * but are part of the jar file of a user job. + * + * @see UserCodeClassLoader + */ + UserCodeClassLoader getUserCodeClassLoader(); + + /** + * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task + * thread in between record processing. + * + * <p>Note that this method should not be used per-record for performance reasons in the + * same way as records should not be sent to the external system individually. Rather, + * implementers are expected to batch records and only enqueue a single {@link Runnable} per + * batch to handle the result. + */ + MailboxExecutor getMailboxExecutor(); + + /** + * Returns a {@link ProcessingTimeService} that can be used to get the current time and + * register timers. + */ + ProcessingTimeService getProcessingTimeService(); + + /** @return The id of task where the writer is. */ + int getSubtaskId(); + + /** @return The number of parallel Sink tasks. */ + int getNumberOfParallelSubtasks(); + + /** @return The metric group this writer belongs to. */ + SinkWriterMetricGroup metricGroup(); + + /** + * Returns id of the restored checkpoint, if state was restored from the snapshot of a + * previous execution. + */ + OptionalLong getRestoredCheckpointId(); + + /** + * Provides a view on this context as a {@link SerializationSchema.InitializationContext}. + */ + SerializationSchema.InitializationContext asSerializationSchemaInitializationContext(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java new file mode 100644 index 0000000..9c3394d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.eventtime.Watermark; + +import java.io.IOException; + +/** + * The {@code SinkWriter} is responsible for writing data. + * + * @param <InputT> The type of the sink writer's input + */ +@PublicEvolving +public interface SinkWriter<InputT> extends AutoCloseable { + + /** + * Adds an element to the writer. + * + * @param element The input record + * @param context The additional information about the input record + * @throws IOException if fail to add an element. + */ + void write(InputT element, Context context) throws IOException, InterruptedException; + + /** + * Called on checkpoint or end of input so that the writer to flush all pending data for + * at-least-once. + */ + void flush(boolean endOfInput) throws IOException, InterruptedException; + + /** + * Adds a watermark to the writer. + * + * <p>This method is intended for advanced sinks that propagate watermarks. + * + * @param watermark The watermark. + * @throws IOException if fail to add a watermark. + */ + default void writeWatermark(Watermark watermark) throws IOException, InterruptedException {} + + /** Context that {@link #write} can use for getting additional data about an input record. */ + @PublicEvolving + interface Context { + + /** Returns the current event-time watermark. */ + long currentWatermark(); + + /** + * Returns the timestamp of the current input record or {@code null} if the element does not + * have an assigned timestamp. + */ + Long timestamp(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java new file mode 100644 index 0000000..a181466 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * A {@link Sink} with a stateful {@link SinkWriter}. + * + * <p>The {@link StatefulSink} needs to be serializable. All configuration should be validated + * eagerly. The respective sink writers are transient and will only be created in the subtasks on + * the taskmanagers. + * + * @param <InputT> The type of the sink's input + * @param <WriterStateT> The type of the sink writer's state + */ +@PublicEvolving +public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> { + + /** + * Create a {@link StatefulSinkWriter}. + * + * @param context the runtime context. + * @return A sink writer. + * @throws IOException for any failure during creation. + */ + StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) throws IOException; + + /** + * Create a {@link StatefulSinkWriter} from a recovered state. + * + * @param context the runtime context. + * @return A sink writer. + * @throws IOException for any failure during creation. + */ + StatefulSinkWriter<InputT, WriterStateT> restoreWriter( + InitContext context, Collection<WriterStateT> recoveredState) throws IOException; + + /** + * Any stateful sink needs to provide this state serializer and implement {@link + * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link + * #restoreWriter(InitContext, Collection)} on recovery. + * + * @return the serializer of the writer's state type. + */ + SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer(); + + /** + * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible + * state to this sink. + */ + @PublicEvolving + interface WithCompatibleState { + /** + * A list of state names of sinks from which the state can be restored. For example, the new + * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a + * drop-in replacement when resuming from a checkpoint/savepoint. + */ + Collection<String> getCompatibleWriterStateNames(); + } + + /** + * A {@link SinkWriter} whose state needs to be checkpointed. + * + * @param <InputT> The type of the sink writer's input + * @param <WriterStateT> The type of the writer's state + */ + @PublicEvolving + interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> { + /** + * @return The writer's state. + * @throws IOException if fail to snapshot writer's state. + */ + List<WriterStateT> snapshotState(long checkpointId) throws IOException; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java new file mode 100644 index 0000000..7fcb159 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.util.Collection; + +/** + * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink} + * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that + * actually commits the data. To facilitate the separation the {@link SinkWriter} creates + * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}. + * + * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be + * validated eagerly. The respective sink writers and committers are transient and will only be + * created in the subtasks on the taskmanagers. + * + * @param <InputT> The type of the sink's input + * @param <CommT> The type of the committables. + */ +@PublicEvolving +public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> { + + /** + * Creates a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of + * input. + * + * @param context the runtime context. + * @return A sink writer for the two-phase commit protocol. + * @throws IOException for any failure during creation. + */ + PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException; + + /** + * Creates a {@link Committer} that permanently makes the previously written data visible + * through {@link Committer#commit(Collection)}. + * + * @return A committer for the two-phase commit protocol. + * @throws IOException for any failure during creation. + */ + Committer<CommT> createCommitter() throws IOException; + + /** Returns the serializer of the committable type. */ + SimpleVersionedSerializer<CommT> getCommittableSerializer(); + + /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */ + @PublicEvolving + interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> { + /** + * Prepares for a commit. + * + * <p>This method will be called after {@link #flush(boolean)} and before {@link + * StatefulSinkWriter#snapshotState(long)}. + * + * @return The data to commit as the second step of the two-phase commit protocol. + * @throws IOException if fail to prepare for a commit. + */ + Collection<CommT> prepareCommit() throws IOException, InterruptedException; + } +}
