This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 073c9526a545baa727beef04c776ab12e82cbe24 Author: Aljoscha Krettek <[email protected]> AuthorDate: Fri Nov 27 12:41:17 2020 +0100 [FLINK-20337] Extend StatefulSinkWriterOperator Javadoc --- .../operators/sink/StatefulSinkWriterOperator.java | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java index 65b395c..769dc29 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java @@ -59,13 +59,21 @@ final class StatefulSinkWriterOperator<InputT, CommT, WriterStateT> extends Abst /** The writer operator's state serializer. */ private final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer; - /** The previous sink operator's state name. */ + /** + * The previous sink operator's state name. We allow restoring state from a different + * (compatible) sink implementation such as {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}. + * This allows migration to newer Sink implementations. + */ @Nullable private final String previousSinkStateName; // ------------------------------- runtime fields --------------------------------------- - /** The previous sink operator's state. */ + /** + * The previous sink operator's state. We allow restoring state from a different (compatible) + * sink implementation such as {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}. + * This allows migration to newer Sink implementations. + */ @Nullable private ListState<WriterStateT> previousSinkState; @@ -87,8 +95,12 @@ final class StatefulSinkWriterOperator<InputT, CommT, WriterStateT> extends Abst public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); - final ListState<byte[]> rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC); - writerState = new SimpleVersionedListState<>(rawState, writerStateSimpleVersionedSerializer); + final ListState<byte[]> rawState = context + .getOperatorStateStore() + .getListState(WRITER_RAW_STATES_DESC); + writerState = new SimpleVersionedListState<>( + rawState, + writerStateSimpleVersionedSerializer); if (previousSinkStateName != null) { final ListStateDescriptor<byte[]> preSinkStateDesc = new ListStateDescriptor<>(
