This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a3516d7d0094a9089a077b728b47d3607632177 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Thu May 28 15:10:21 2020 +0200 [FLINK-17376] Use JavaSerializer instead of getSerializableListState() We do this because we want to deprecate that method. We will have to get rid of using JavaSerialization completely soon, though. --- .../connectors/fs/bucketing/BucketingSink.java | 8 +++++++- .../fs/bucketing/BucketingSinkMigrationTest.java | 8 +++++++- .../source/ContinuousFileReaderOperator.java | 8 +++++++- .../source/MessageAcknowledgingSourceBase.java | 8 +++++++- .../runtime/operators/GenericWriteAheadSink.java | 11 +++++++++-- .../util/functions/StreamingFunctionUtils.java | 20 ++++++++++++++++---- 6 files changed, 53 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 78cefaf..ad598a2 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -387,8 +389,12 @@ public class BucketingSink<T> this.refTruncate = reflectTruncate(fs); } + // We are using JavaSerializer from the flink-runtime module here. This is very naughty and + // we shouldn't be doing it because ideally nothing in the API modules/connector depends + // directly on flink-runtime. We are doing it here because we need to maintain backwards + // compatibility with old state and because we will have to rework/remove this code soon. OperatorStateStore stateStore = context.getOperatorStateStore(); - restoredBucketStates = stateStore.getSerializableListState("bucket-states"); + this.restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>())); int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); if (context.isRestored()) { diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java index 5bd0fcf..cfa68dc 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java @@ -19,10 +19,12 @@ package org.apache.flink.streaming.connectors.fs.bucketing; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -204,7 +206,11 @@ public class BucketingSinkMigrationTest { public void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); - ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states"); + // We are using JavaSerializer from the flink-runtime module here. This is very naughty and + // we shouldn't be doing it because ideally nothing in the API modules/connector depends + // directly on flink-runtime. We are doing it here because we need to maintain backwards + // compatibility with old state and because we will have to rework/remove this code soon. + ListState<State<T>> restoredBucketStates = stateStore.getListState(new ListStateDescriptor<>("bucket-states", new JavaSerializer<>())); if (context.isRestored()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 87a028c..1b050f5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -23,10 +23,12 @@ import org.apache.flink.api.common.io.CheckpointableInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -243,7 +245,11 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> checkState(checkpointedState == null, "The reader state has already been initialized."); - checkpointedState = context.getOperatorStateStore().getSerializableListState("splits"); + // We are using JavaSerializer from the flink-runtime module here. This is very naughty and + // we shouldn't be doing it because ideally nothing in the API modules/connector depends + // directly on flink-runtime. We are doing it here because we need to maintain backwards + // compatibility with old state and because we will have to rework/remove this code soon. + checkpointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>())); int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); if (!context.isRestored()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 3a2a5ca..5b99194 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -28,6 +29,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.util.Preconditions; @@ -138,9 +140,13 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId> Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized."); + // We are using JavaSerializer from the flink-runtime module here. This is very naughty and + // we shouldn't be doing it because ideally nothing in the API modules/connector depends + // directly on flink-runtime. We are doing it here because we need to maintain backwards + // compatibility with old state and because we will have to rework/remove this code soon. this.checkpointedState = context .getOperatorStateStore() - .getSerializableListState("message-acknowledging-source-state"); + .getListState(new ListStateDescriptor<>("message-acknowledging-source-state", new JavaSerializer<>())); this.idsForCurrentCheckpoint = new HashSet<>(64); this.pendingCheckpoints = new ArrayDeque<>(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 4ad0fc6..889cec1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -26,6 +27,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.io.disk.InputViewIterator; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StreamStateHandle; @@ -92,8 +94,13 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I Preconditions.checkState(this.checkpointedState == null, "The reader state has already been initialized."); - checkpointedState = context.getOperatorStateStore() - .getSerializableListState("pending-checkpoints"); + // We are using JavaSerializer from the flink-runtime module here. This is very naughty and + // we shouldn't be doing it because ideally nothing in the API modules/connector depends + // directly on flink-runtime. We are doing it here because we need to maintain backwards + // compatibility with old state and because we will have to rework/remove this code soon. + checkpointedState = context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("pending-checkpoints", new JavaSerializer<>())); int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); if (context.isRestored()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java index 4482431..d9ea561 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java @@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; +import org.apache.flink.runtime.state.JavaSerializer; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -125,8 +127,13 @@ public final class StreamingFunctionUtils { List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction). snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp()); - ListState<Serializable> listState = backend. - getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); + // We are using JavaSerializer from the flink-runtime module here. This is very naughty and + // we shouldn't be doing it because ideally nothing in the API modules/connector depends + // directly on flink-runtime. We are doing it here because we need to maintain backwards + // compatibility with old state and because we will have to rework/remove this code soon. + ListStateDescriptor<Serializable> listStateDescriptor = + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>()); + ListState<Serializable> listState = backend.getListState(listStateDescriptor); listState.clear(); @@ -184,8 +191,13 @@ public final class StreamingFunctionUtils { @SuppressWarnings("unchecked") ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction; - ListState<Serializable> listState = context.getOperatorStateStore(). - getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); + // We are using JavaSerializer from the flink-runtime module here. This is very naughty and + // we shouldn't be doing it because ideally nothing in the API modules/connector depends + // directly on flink-runtime. We are doing it here because we need to maintain backwards + // compatibility with old state and because we will have to rework/remove this code soon. + ListStateDescriptor<Serializable> listStateDescriptor = + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, new JavaSerializer<>()); + ListState<Serializable> listState = context.getOperatorStateStore().getListState(listStateDescriptor); List<Serializable> list = new ArrayList<>();