Repository: flink Updated Branches: refs/heads/master 3e3a90d89 -> 4656350fc
[FLINK-5000] Rename Methods in ManagedInitializationContext This removes "managed" from the OperatorStateStore and KeyedStateStore access methods. There is no "un-managed" state and users might be wondering what "managed" means here. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4656350f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4656350f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4656350f Branch: refs/heads/master Commit: 4656350fc33d42ff96ad6d5e836e62172b4b0de6 Parents: 3e3a90d Author: Aljoscha Krettek <[email protected]> Authored: Wed Nov 23 14:58:33 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Nov 23 16:13:29 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/state/ManagedInitializationContext.java | 4 ++-- .../flink/runtime/state/StateInitializationContextImpl.java | 6 +++--- .../streaming/connectors/fs/bucketing/BucketingSink.java | 2 +- .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java | 2 +- .../streaming/connectors/kafka/FlinkKafkaProducerBase.java | 2 +- .../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 8 ++++---- .../api/functions/source/ContinuousFileReaderOperator.java | 2 +- .../streaming/api/operators/AbstractUdfStreamOperator.java | 2 +- .../api/operators/StreamOperatorSnapshotRestoreTest.java | 8 ++------ .../org/apache/flink/test/checkpointing/RescalingITCase.java | 6 +++--- 10 files changed, 19 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java index abc528b..5255c43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java @@ -43,11 +43,11 @@ public interface ManagedInitializationContext { /** * Returns an interface that allows for registering operator state with the backend. */ - OperatorStateStore getManagedOperatorStateStore(); + OperatorStateStore getOperatorStateStore(); /** * Returns an interface that allows for registering keyed state with the backend. */ - KeyedStateStore getManagedKeyedStateStore(); + KeyedStateStore getKeyedStateStore(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index b131d14..c86ff6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -122,12 +122,12 @@ public class StateInitializationContextImpl implements StateInitializationContex } @Override - public OperatorStateStore getManagedOperatorStateStore() { + public OperatorStateStore getOperatorStateStore() { return operatorStateStore; } @Override - public KeyedStateStore getManagedKeyedStateStore() { + public KeyedStateStore getKeyedStateStore() { return keyedStateStore; } @@ -268,4 +268,4 @@ public class StateInitializationContextImpl implements StateInitializationContex throw new UnsupportedOperationException("Read only Iterator"); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 1da56b4..cf2c373 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -350,7 +350,7 @@ public class BucketingSink<T> this.refTruncate = reflectTruncate(fs); } - OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + OperatorStateStore stateStore = context.getOperatorStateStore(); restoredBucketStates = stateStore.getSerializableListState("bucket-states"); int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 5161b35..aef7116 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -314,7 +314,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti @Override public void initializeState(FunctionInitializationContext context) throws Exception { - OperatorStateStore stateStore = context.getManagedOperatorStateStore(); + OperatorStateStore stateStore = context.getOperatorStateStore(); offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); if (context.isRestored()) { http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index 33289f8..d413f1c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -343,7 +343,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im @Override public void initializeState(FunctionInitializationContext context) throws Exception { - this.stateStore = context.getManagedOperatorStateStore(); + this.stateStore = context.getOperatorStateStore(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 9b7eabf..b96ba30 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -132,7 +132,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); when(initializationContext.isRestored()).thenReturn(true); consumer.initializeState(initializationContext); @@ -172,7 +172,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); when(initializationContext.isRestored()).thenReturn(false); consumer.initializeState(initializationContext); @@ -199,7 +199,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); // make the context signal that there is no restored state, then validate that when(initializationContext.isRestored()).thenReturn(false); @@ -245,7 +245,7 @@ public class FlinkKafkaConsumerBaseTest { StateInitializationContext initializationContext = mock(StateInitializationContext.class); - when(initializationContext.getManagedOperatorStateStore()).thenReturn(backend); + when(initializationContext.getOperatorStateStore()).thenReturn(backend); when(initializationContext.isRestored()).thenReturn(false, true, true, true); consumer.initializeState(initializationContext); http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 74c58f9..bbe1ea5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -92,7 +92,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU checkState(this.checkpointedState == null && this.restoredReaderState == null, "The reader state has already been initialized."); - checkpointedState = context.getManagedOperatorStateStore().getSerializableListState("splits"); + checkpointedState = context.getOperatorStateStore().getSerializableListState("splits"); int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); if (context.isRestored()) { http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 72ed5dc..c1f783f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -130,7 +130,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> @SuppressWarnings("unchecked") ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction; - ListState<Serializable> listState = context.getManagedOperatorStateStore(). + ListState<Serializable> listState = context.getOperatorStateStore(). getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); List<Serializable> list = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index c02a7c3..08fbcbe 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -28,10 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -39,13 +37,11 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.util.FutureUtil; import org.junit.Assert; import org.junit.Test; import java.io.InputStream; import java.util.BitSet; -import java.util.Collections; public class StreamOperatorSnapshotRestoreTest { @@ -173,8 +169,8 @@ public class StreamOperatorSnapshotRestoreTest { Assert.assertEquals(verifyRestore, context.isRestored()); - keyedState = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0)); - opState = context.getManagedOperatorStateStore().getSerializableListState("managed-op-state"); + keyedState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("managed-keyed", Integer.class, 0)); + opState = context.getOperatorStateStore().getSerializableListState("managed-op-state"); if (context.isRestored()) { // check restored raw keyed state http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 09de67f..bc65abf 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -792,8 +792,8 @@ public class RescalingITCase extends TestLogger { @Override public void initializeState(FunctionInitializationContext context) throws Exception { - counter = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0)); - sum = context.getManagedKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0)); + counter = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0)); + sum = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("sum", Integer.class, 0)); } } @@ -937,7 +937,7 @@ public class RescalingITCase extends TestLogger { @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.counterPartitions = - context.getManagedOperatorStateStore().getSerializableListState("counter_partitions"); + context.getOperatorStateStore().getSerializableListState("counter_partitions"); if (context.isRestored()) { for (int v : counterPartitions.get()) { counter += v;
