This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6a3516d7d0094a9089a077b728b47d3607632177
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Thu May 28 15:10:21 2020 +0200

    [FLINK-17376] Use JavaSerializer instead of getSerializableListState()
    
    We do this because we want to deprecate that method. We will have to get
    rid of using JavaSerialization completely soon, though.
---
 .../connectors/fs/bucketing/BucketingSink.java       |  8 +++++++-
 .../fs/bucketing/BucketingSinkMigrationTest.java     |  8 +++++++-
 .../source/ContinuousFileReaderOperator.java         |  8 +++++++-
 .../source/MessageAcknowledgingSourceBase.java       |  8 +++++++-
 .../runtime/operators/GenericWriteAheadSink.java     | 11 +++++++++--
 .../util/functions/StreamingFunctionUtils.java       | 20 ++++++++++++++++----
 6 files changed, 53 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 78cefaf..ad598a2 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -29,6 +30,7 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -387,8 +389,12 @@ public class BucketingSink<T>
                        this.refTruncate = reflectTruncate(fs);
                }
 
+               // We are using JavaSerializer from the flink-runtime module 
here. This is very naughty and
+               // we shouldn't be doing it because ideally nothing in the API 
modules/connector depends
+               // directly on flink-runtime. We are doing it here because we 
need to maintain backwards
+               // compatibility with old state and because we will have to 
rework/remove this code soon.
                OperatorStateStore stateStore = context.getOperatorStateStore();
-               restoredBucketStates = 
stateStore.getSerializableListState("bucket-states");
+               this.restoredBucketStates = stateStore.getListState(new 
ListStateDescriptor<>("bucket-states", new JavaSerializer<>()));
 
                int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
                if (context.isRestored()) {
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 5bd0fcf..cfa68dc 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.streaming.connectors.fs.bucketing;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -204,7 +206,11 @@ public class BucketingSinkMigrationTest {
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
                        OperatorStateStore stateStore = 
context.getOperatorStateStore();
 
-                       ListState<State<T>> restoredBucketStates = 
stateStore.getSerializableListState("bucket-states");
+                       // We are using JavaSerializer from the flink-runtime 
module here. This is very naughty and
+                       // we shouldn't be doing it because ideally nothing in 
the API modules/connector depends
+                       // directly on flink-runtime. We are doing it here 
because we need to maintain backwards
+                       // compatibility with old state and because we will 
have to rework/remove this code soon.
+                       ListState<State<T>> restoredBucketStates = 
stateStore.getListState(new ListStateDescriptor<>("bucket-states", new 
JavaSerializer<>()));
 
                        if (context.isRestored()) {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 87a028c..1b050f5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -23,10 +23,12 @@ import 
org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -243,7 +245,11 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
                checkState(checkpointedState == null, "The reader state has 
already been initialized.");
 
-               checkpointedState = 
context.getOperatorStateStore().getSerializableListState("splits");
+               // We are using JavaSerializer from the flink-runtime module 
here. This is very naughty and
+               // we shouldn't be doing it because ideally nothing in the API 
modules/connector depends
+               // directly on flink-runtime. We are doing it here because we 
need to maintain backwards
+               // compatibility with old state and because we will have to 
rework/remove this code soon.
+               checkpointedState = 
context.getOperatorStateStore().getListState(new 
ListStateDescriptor<>("splits", new JavaSerializer<>()));
 
                int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
                if (!context.isRestored()) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 3a2a5ca..5b99194 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +29,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -138,9 +140,13 @@ public abstract class MessageAcknowledgingSourceBase<Type, 
UId>
                Preconditions.checkState(this.checkpointedState == null,
                        "The " + getClass().getSimpleName() + " has already 
been initialized.");
 
+               // We are using JavaSerializer from the flink-runtime module 
here. This is very naughty and
+               // we shouldn't be doing it because ideally nothing in the API 
modules/connector depends
+               // directly on flink-runtime. We are doing it here because we 
need to maintain backwards
+               // compatibility with old state and because we will have to 
rework/remove this code soon.
                this.checkpointedState = context
                        .getOperatorStateStore()
-                       
.getSerializableListState("message-acknowledging-source-state");
+                       .getListState(new 
ListStateDescriptor<>("message-acknowledging-source-state", new 
JavaSerializer<>()));
 
                this.idsForCurrentCheckpoint = new HashSet<>(64);
                this.pendingCheckpoints = new ArrayDeque<>();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4ad0fc6..889cec1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -26,6 +27,7 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -92,8 +94,13 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                Preconditions.checkState(this.checkpointedState == null,
                        "The reader state has already been initialized.");
 
-               checkpointedState = context.getOperatorStateStore()
-                       .getSerializableListState("pending-checkpoints");
+               // We are using JavaSerializer from the flink-runtime module 
here. This is very naughty and
+               // we shouldn't be doing it because ideally nothing in the API 
modules/connector depends
+               // directly on flink-runtime. We are doing it here because we 
need to maintain backwards
+               // compatibility with old state and because we will have to 
rework/remove this code soon.
+               checkpointedState = context
+                                                       .getOperatorStateStore()
+                                                       .getListState(new 
ListStateDescriptor<>("pending-checkpoints", new JavaSerializer<>()));
 
                int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
                if (context.isRestored()) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
index 4482431..d9ea561 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java
@@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -125,8 +127,13 @@ public final class StreamingFunctionUtils {
                        List<Serializable> partitionableState = 
((ListCheckpointed<Serializable>) userFunction).
                                        
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
 
-                       ListState<Serializable> listState = backend.
-                                       
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+                       // We are using JavaSerializer from the flink-runtime 
module here. This is very naughty and
+                       // we shouldn't be doing it because ideally nothing in 
the API modules/connector depends
+                       // directly on flink-runtime. We are doing it here 
because we need to maintain backwards
+                       // compatibility with old state and because we will 
have to rework/remove this code soon.
+                       ListStateDescriptor<Serializable> listStateDescriptor =
+                               new 
ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, 
new JavaSerializer<>());
+                       ListState<Serializable> listState = 
backend.getListState(listStateDescriptor);
 
                        listState.clear();
 
@@ -184,8 +191,13 @@ public final class StreamingFunctionUtils {
                        @SuppressWarnings("unchecked")
                        ListCheckpointed<Serializable> listCheckpointedFun = 
(ListCheckpointed<Serializable>) userFunction;
 
-                       ListState<Serializable> listState = 
context.getOperatorStateStore().
-                                       
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
+                       // We are using JavaSerializer from the flink-runtime 
module here. This is very naughty and
+                       // we shouldn't be doing it because ideally nothing in 
the API modules/connector depends
+                       // directly on flink-runtime. We are doing it here 
because we need to maintain backwards
+                       // compatibility with old state and because we will 
have to rework/remove this code soon.
+                       ListStateDescriptor<Serializable> listStateDescriptor =
+                               new 
ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, 
new JavaSerializer<>());
+                       ListState<Serializable> listState = 
context.getOperatorStateStore().getListState(listStateDescriptor);
 
                        List<Serializable> list = new ArrayList<>();
 

Reply via email to