AHeise commented on a change in pull request #16701: URL: https://github.com/apache/flink/pull/16701#discussion_r682584648
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperatorFactory.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +import java.util.Optional; + +/** + * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * SinkOperator}. + * + * @param <InputT> The input type of the {@link SinkWriter}. + * @param <CommT> The committable type of the {@link SinkWriter}. + * @param <WriterStateT> The type of the {@link SinkWriter Writer's} state. + */ +public final class SinkOperatorFactory<InputT, CommT, WriterStateT> Review comment: I agree but this class depends on the specific `CommitterHandler`s which I wanted to convert in this commit. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java ########## @@ -33,44 +30,52 @@ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link * Committer} in the batch execution mode. * - * @param <CommT> The committable type of the {@link Committer}. + * @param <InputT> The committable type of the {@link Committer}. */ -final class BatchCommitterOperator<CommT> extends AbstractStreamOperator<CommT> - implements OneInputStreamOperator<CommT, CommT>, BoundedOneInput { +final class BatchCommitterHandler<InputT, OutputT> + extends AbstractCommitterHandler<InputT, OutputT> { /** Responsible for committing the committable to the external system. */ - private final Committer<CommT> committer; + private final Committer<InputT> committer; - /** Record all the committables until the end of the input. */ - private final List<CommT> allCommittables; + /** + * The committer that is chained to this committer. It's either {@link + * GlobalBatchCommitterHandler} or {@link NoopCommitterHandler}. Review comment: The `NoopCommitterHandler` avoids null checks inside this class. It's a matter of flavor but in general a `NoopCommitterHandler` is the proper OOP approach (also dubbed case-less programming because of VMT). ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -161,6 +163,12 @@ private FileSink(BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuil return Optional.empty(); } + @Override + public Collection<String> getCompatibleStateNames() { Review comment: I don't understand your comment. Why would you favor a `List` here? The only difference between a collection and a list is that the latter is implicitly ordered and indexed. None of that matters here. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java ########## @@ -81,9 +83,17 @@ /** Return the serializer of the writer's state type. */ Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer(); + /** + * A list of state names of sinks from which the state can be restored. For example, the new + * file sink can resume from the state of an old streaming file sink as a drop-in replacement + * when resuming from a checkpoint/savepoint. + */ + default Collection<String> getCompatibleStateNames() { Review comment: I consider it more of a bugfix ;) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.util.function.SupplierWithException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * A wrapper around multiple {@link org.apache.flink.api.connector.sink.Committer} or {@link + * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states. + * + * @param <InputT> + * @param <OutputT> + */ +interface CommitterHandler<InputT, OutputT> extends AutoCloseable, Serializable { + + /** Initializes the state of the committer and this handler. */ + default void initializeState(StateInitializationContext context) throws Exception {} + + /** Snapshots the state of the committer and this handler. */ + default void snapshotState(StateSnapshotContext context) throws Exception {} + + /** + * Processes the committables be either directly transforming them or by adding them to the + * internal state of this handler. The supplier should only be queried once. + * + * @return a list of output committables that is send downstream. + */ + List<OutputT> processCommittables( + SupplierWithException<List<InputT>, IOException> committableSupplier) + throws IOException; + + /** + * Called when no more committables are going to be added through {@link + * #processCommittables(SupplierWithException)}. + * + * @return a list of output committables that is send downstream. + */ + default List<OutputT> endOfInput() throws IOException { + return Collections.emptyList(); + } + + default void close() throws Exception {} + + /** Called when a checkpoint is completed and returns a list of output to be sent downstream. */ + default Collection<OutputT> notifyCheckpointCompleted(long checkpointId) throws IOException { + return Collections.emptyList(); + } + + /** Swallows all committables and emits nothing. */ + @SuppressWarnings("unchecked") + static <InputT, OutputT> CommitterHandler<InputT, OutputT> noop() { Review comment: I'll remove it. Originally, `CommitterHandler` was public and the composition happened in the SinkTranslater. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * CommitterOperator}. + * + * @param <CommT> the type of the committable + * @param <GlobalCommT> the type of the global committable + */ +public final class CommitterOperatorFactory<CommT, GlobalCommT> + extends AbstractStreamOperatorFactory<byte[]> + implements OneInputStreamOperatorFactory<byte[], byte[]> { + + private final Sink<?, CommT, ?, GlobalCommT> sink; + private final boolean batch; + + public CommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink, boolean batch) { + this.sink = sink; + this.batch = batch; + } + + @Override + @SuppressWarnings("unchecked") + public <T extends StreamOperator<byte[]>> T createStreamOperator( + StreamOperatorParameters<byte[]> parameters) { + + SimpleVersionedSerializer<CommT> committableSerializer = + sink.getCommittableSerializer().orElseThrow(this::noSerializerFound); + try { + CommitterHandler<CommT, GlobalCommT> committerHandler = getGlobalCommitterHandler(); + if (batch) { + Optional<Committer<CommT>> committer = sink.createCommitter(); + if (committer.isPresent()) { + committerHandler = + new BatchCommitterHandler<>(committer.get(), committerHandler); + } + } + + checkState( Review comment: It's always succeeding (`Precondition#checkState` fails on `false`) in your case. However, it could fail if there is no global committer in streaming and it fails if there is neither a committer nor a global committer in batch. But this is a programmatic error in `SinkTranslater`; hence the `checkState`. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java ########## @@ -72,11 +58,12 @@ public void endInput() throws Exception { } } globalCommitter.endOfInput(); + return Collections.emptyList(); } @Override public void close() throws Exception { - super.close(); globalCommitter.close(); + super.close(); Review comment: You are supposed to close resources always in the opposite order of creation. That's also how try-with-resource and Guava's Closer work. The main idea is that your later resources may depend on the prior resources to finish properly. Here, it doesn't matter, but I'd still like to have it consistently correct in Flink. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandler.java ########## @@ -72,11 +58,12 @@ public void endInput() throws Exception { } } globalCommitter.endOfInput(); + return Collections.emptyList(); } @Override public void close() throws Exception { - super.close(); globalCommitter.close(); + super.close(); Review comment: You are supposed to close resources always in the opposite order of creation. That's also how try-with-resource and Guava's Closer work. The main idea is that your later resources may depend on the prior resources to finish properly. (Think of a transaction that flushes, so you'd close the transaction before the client). Here, it doesn't matter, but I'd still like to have it consistently correct in Flink. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/BatchCommitterHandler.java ########## @@ -33,44 +30,52 @@ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link * Committer} in the batch execution mode. * - * @param <CommT> The committable type of the {@link Committer}. + * @param <InputT> The committable type of the {@link Committer}. */ -final class BatchCommitterOperator<CommT> extends AbstractStreamOperator<CommT> - implements OneInputStreamOperator<CommT, CommT>, BoundedOneInput { +final class BatchCommitterHandler<InputT, OutputT> + extends AbstractCommitterHandler<InputT, OutputT> { /** Responsible for committing the committable to the external system. */ - private final Committer<CommT> committer; + private final Committer<InputT> committer; - /** Record all the committables until the end of the input. */ - private final List<CommT> allCommittables; + /** + * The committer that is chained to this committer. It's either {@link + * GlobalBatchCommitterHandler} or {@link NoopCommitterHandler}. Review comment: Here is some reference https://en.wikipedia.org/wiki/Null_object_pattern TL;DR `null` is an external construct in OOP. Also don't worry about performance here: First, we have a singleton pattern as an enum that's very efficient. Second, we are not on the hot path. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.util.function.SupplierWithException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * A wrapper around multiple {@link org.apache.flink.api.connector.sink.Committer} or {@link + * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states. + * + * @param <InputT> + * @param <OutputT> + */ +interface CommitterHandler<InputT, OutputT> extends AutoCloseable, Serializable { + + /** Initializes the state of the committer and this handler. */ + default void initializeState(StateInitializationContext context) throws Exception {} + + /** Snapshots the state of the committer and this handler. */ + default void snapshotState(StateSnapshotContext context) throws Exception {} + + /** + * Processes the committables be either directly transforming them or by adding them to the + * internal state of this handler. The supplier should only be queried once. + * + * @return a list of output committables that is send downstream. + */ + List<OutputT> processCommittables( + SupplierWithException<List<InputT>, IOException> committableSupplier) + throws IOException; + + /** + * Called when no more committables are going to be added through {@link + * #processCommittables(SupplierWithException)}. + * + * @return a list of output committables that is send downstream. + */ + default List<OutputT> endOfInput() throws IOException { + return Collections.emptyList(); + } + + default void close() throws Exception {} + + /** Called when a checkpoint is completed and returns a list of output to be sent downstream. */ + default Collection<OutputT> notifyCheckpointCompleted(long checkpointId) throws IOException { + return Collections.emptyList(); + } + + /** Swallows all committables and emits nothing. */ + @SuppressWarnings("unchecked") + static <InputT, OutputT> CommitterHandler<InputT, OutputT> noop() { Review comment: I can't fully remove it without introducing some ugliness: This method is for casting the singleton into the expected type. The `NoopCommitterHandler.INSTANCE` could actually be covariant to any type but you can't express that in Java. I have moved it to `NoopCommitterHandler#getInstance()`. PTAL if it's less confusing. I'm also promoting `NoopCommitterHandler` to top-level. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.util.function.SupplierWithException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * A wrapper around multiple {@link org.apache.flink.api.connector.sink.Committer} or {@link + * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states. + * + * @param <InputT> + * @param <OutputT> + */ +interface CommitterHandler<InputT, OutputT> extends AutoCloseable, Serializable { + + /** Initializes the state of the committer and this handler. */ + default void initializeState(StateInitializationContext context) throws Exception {} + + /** Snapshots the state of the committer and this handler. */ + default void snapshotState(StateSnapshotContext context) throws Exception {} + + /** + * Processes the committables by either directly transforming them or by adding them to the + * internal state of this handler. The supplier should only be queried once. + * + * @return a list of output committables that is send downstream. + */ + List<OutputT> processCommittables( + SupplierWithException<List<InputT>, IOException> committableSupplier) + throws IOException; + + /** + * Called when no more committables are going to be added through {@link + * #processCommittables(SupplierWithException)}. + * + * @return a list of output committables that is send downstream. + */ + default List<OutputT> endOfInput() throws IOException { Review comment: Good questions. I double-checked and EOI on streaming is really just meant for flushing data. A final checkpoint happens afterwards (especially after FLIP-147). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.util.function.SupplierWithException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * A wrapper around multiple {@link org.apache.flink.api.connector.sink.Committer} or {@link + * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states. + * + * @param <InputT> + * @param <OutputT> + */ +interface CommitterHandler<InputT, OutputT> extends AutoCloseable, Serializable { + + /** Initializes the state of the committer and this handler. */ + default void initializeState(StateInitializationContext context) throws Exception {} + + /** Snapshots the state of the committer and this handler. */ + default void snapshotState(StateSnapshotContext context) throws Exception {} + + /** + * Processes the committables by either directly transforming them or by adding them to the + * internal state of this handler. The supplier should only be queried once. + * + * @return a list of output committables that is send downstream. + */ + List<OutputT> processCommittables( + SupplierWithException<List<InputT>, IOException> committableSupplier) + throws IOException; + + /** + * Called when no more committables are going to be added through {@link + * #processCommittables(SupplierWithException)}. + * + * @return a list of output committables that is send downstream. + */ + default List<OutputT> endOfInput() throws IOException { Review comment: Good questions. I double-checked and EOI on streaming is really just meant for flushing data. A final checkpoint happens afterwards (especially after FLIP-147). Most likely we don't need that for Batch after FLIP-147 but that's a future task. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ########## @@ -139,6 +139,7 @@ public static void assertNoLateRecords(Iterable<Object> elements) { testHarness.processElements( input.stream().map(StreamRecord::new).collect(Collectors.toList())); + testHarness.prepareSnapshotPreBarrier(1); Review comment: Since writer+committer are merged, we now need to simulate more of the actual calls. The writer creates the committables on `prepareSnapshotPreBarrier` and pass it to the committer which then does it's actual work on `snapshotState`. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterHandler.java ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.util.function.SupplierWithException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * A wrapper around multiple {@link org.apache.flink.api.connector.sink.Committer} or {@link + * org.apache.flink.api.connector.sink.GlobalCommitter} that manages states. + * + * @param <InputT> + * @param <OutputT> + */ +interface CommitterHandler<InputT, OutputT> extends AutoCloseable, Serializable { + + /** Initializes the state of the committer and this handler. */ + default void initializeState(StateInitializationContext context) throws Exception {} + + /** Snapshots the state of the committer and this handler. */ + default void snapshotState(StateSnapshotContext context) throws Exception {} Review comment: They are called for all `CommitterHandler` though so we need to leave them here. Or we have to do instanceof in the operators. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java ########## @@ -46,44 +54,81 @@ public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable { /** - * Create a {@link SinkWriter}. + * Create a {@link SinkWriter}. If the application is resumed from a checkpoint or savepoint and + * the sink is stateful, it will receive the corresponding state obtained with {@link + * SinkWriter#snapshotState()} and serialized with {@link #getWriterStateSerializer()}. If no + * state exists, the first existing, compatible state specified in {@link + * #getCompatibleStateNames()} will be loaded and passed. * * @param context the runtime context. - * @param states the writer's state. + * @param states the writer's previous state. * @return A sink writer. - * @throws IOException if fail to create a writer. + * @throws IOException for any failure during creation. + * @see SinkWriter#snapshotState() + * @see #getWriterStateSerializer() + * @see #getCompatibleStateNames() */ SinkWriter<InputT, CommT, WriterStateT> createWriter( InitContext context, List<WriterStateT> states) throws IOException; /** - * Creates a {@link Committer}. + * Any stateful sink needs to provide this state serializer and implement {@link + * SinkWriter#snapshotState()} properly. The respective state is used in {@link + * #createWriter(InitContext, List)} on recovery. + * + * @return the serializer of the writer's state type. + */ + Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer(); + + /** + * Creates a {@link Committer} which is part of a 2-phase-commit protocol. The {@link + * SinkWriter} creates committables through {@link SinkWriter#prepareCommit(boolean)} in the + * first phase. The committables are then passed to this committer and persisted with {@link + * Committer#commit(List)}. If a committer is returned, the sink must also return a {@link + * #getCommittableSerializer()}. * - * @return A committer. - * @throws IOException if fail to create a committer. + * @return A committer for the 2-phase-commit protocol. + * @throws IOException for any failure during creation. */ Optional<Committer<CommT>> createCommitter() throws IOException; /** - * Creates a {@link GlobalCommitter}. + * Creates a {@link GlobalCommitter} which is part of a 2-phase-commit protocol. The {@link + * SinkWriter} creates committables through {@link SinkWriter#prepareCommit(boolean)} in the + * first phase. The committables are then passed to the Committer and persisted with {@link + * Committer#commit(List)} which also can return an aggregated committable. This aggregated + * committables are passed to this {@link GlobalCommitter} of which only a single instance + * exists. If a global committer is returned, the sink must also return a {@link + * #getCommittableSerializer()} and {@link #getGlobalCommittableSerializer()}. * - * @return A global committer. - * @throws IOException if fail to create a global committer. + * @return A global committer for the 2-phase-commit protocol. + * @throws IOException for any failure during creation. */ Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException; - /** Returns the serializer of the committable type. */ + /** + * Returns the serializer of the committable type. The serializer is required iff the sink has a + * {@link Committer} or {@link GlobalCommitter}. + */ Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer(); - /** Returns the serializer of the aggregated committable type. */ + /** + * Returns the serializer of the aggregated committable type. The serializer is required iff the + * sink has a {@link GlobalCommitter}. + */ Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer(); - /** Return the serializer of the writer's state type. */ - Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer(); + /** + * A list of state names of sinks from which the state can be restored. For example, the new + * file sink can resume from the state of an old streaming file sink as a drop-in replacement Review comment: I can't exactly use a JavaDoc link because of circular dependency issues. But I have used the proper class names in the code tag to make it more obvious. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java ########## @@ -48,41 +54,70 @@ public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable { /** - * Create a {@link SinkWriter}. + * Create a {@link SinkWriter}. If the application is resumed from a checkpoint or savepoint and + * the sink is stateful, it will receive the corresponding state obtained with {@link + * SinkWriter#snapshotState()} and serialized with {@link #getWriterStateSerializer()}. If no + * state exists, the first existing, compatible state specified in {@link + * #getCompatibleStateNames()} will be loaded and passed. * * @param context the runtime context. - * @param states the writer's state. + * @param states the writer's previous state. * @return A sink writer. - * @throws IOException if fail to create a writer. + * @throws IOException for any failure during creation. + * @see SinkWriter#snapshotState() + * @see #getWriterStateSerializer() + * @see #getCompatibleStateNames() */ SinkWriter<InputT, CommT, WriterStateT> createWriter( InitContext context, List<WriterStateT> states) throws IOException; /** - * Creates a {@link Committer}. + * Any stateful sink needs to provide this state serializer and implement {@link + * SinkWriter#snapshotState()} properly. The respective state is used in {@link + * #createWriter(InitContext, List)} on recovery. + * + * @return the serializer of the writer's state type. + */ + Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer(); + + /** + * Creates a {@link Committer} which is part of a 2-phase-commit protocol. The {@link + * SinkWriter} creates committables through {@link SinkWriter#prepareCommit(boolean)} in the + * first phase. The committables are then passed to this committer and persisted with {@link + * Committer#commit(List)}. If a committer is returned, the sink must also return a {@link + * #getCommittableSerializer()}. * - * @return A committer. - * @throws IOException if fail to create a committer. + * @return A committer for the 2-phase-commit protocol. + * @throws IOException for any failure during creation. */ Optional<Committer<CommT>> createCommitter() throws IOException; /** - * Creates a {@link GlobalCommitter}. + * Creates a {@link GlobalCommitter} which is part of a 2-phase-commit protocol. The {@link + * SinkWriter} creates committables through {@link SinkWriter#prepareCommit(boolean)} in the + * first phase. The committables are then passed to the Committer and persisted with {@link + * Committer#commit(List)} which also can return an aggregated committable. This aggregated Review comment: You are right. Thanks for double-checking! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
