This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 073c9526a545baa727beef04c776ab12e82cbe24
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Fri Nov 27 12:41:17 2020 +0100

    [FLINK-20337] Extend StatefulSinkWriterOperator Javadoc
---
 .../operators/sink/StatefulSinkWriterOperator.java   | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java
index 65b395c..769dc29 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulSinkWriterOperator.java
@@ -59,13 +59,21 @@ final class StatefulSinkWriterOperator<InputT, CommT, 
WriterStateT> extends Abst
        /** The writer operator's state serializer. */
        private final SimpleVersionedSerializer<WriterStateT> 
writerStateSimpleVersionedSerializer;
 
-       /** The previous sink operator's state name. */
+       /**
+        * The previous sink operator's state name. We allow restoring state 
from a different
+        * (compatible) sink implementation such as {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.
+        * This allows migration to newer Sink implementations.
+        */
        @Nullable
        private final String previousSinkStateName;
 
        // ------------------------------- runtime fields 
---------------------------------------
 
-       /** The previous sink operator's state. */
+       /**
+        * The previous sink operator's state. We allow restoring state from a 
different (compatible)
+        * sink implementation such as {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.
+        * This allows migration to newer Sink implementations.
+        */
        @Nullable
        private ListState<WriterStateT> previousSinkState;
 
@@ -87,8 +95,12 @@ final class StatefulSinkWriterOperator<InputT, CommT, 
WriterStateT> extends Abst
        public void initializeState(StateInitializationContext context) throws 
Exception {
                super.initializeState(context);
 
-               final ListState<byte[]> rawState = 
context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
-               writerState = new SimpleVersionedListState<>(rawState, 
writerStateSimpleVersionedSerializer);
+               final ListState<byte[]> rawState = context
+                               .getOperatorStateStore()
+                               .getListState(WRITER_RAW_STATES_DESC);
+               writerState = new SimpleVersionedListState<>(
+                               rawState,
+                               writerStateSimpleVersionedSerializer);
 
                if (previousSinkStateName != null) {
                        final ListStateDescriptor<byte[]> preSinkStateDesc = 
new ListStateDescriptor<>(

Reply via email to