Repository: flink
Updated Branches:
  refs/heads/master 3e3a90d89 -> 4656350fc


[FLINK-5000] Rename Methods in ManagedInitializationContext

This removes "managed" from the OperatorStateStore and KeyedStateStore
access methods. There is no "un-managed" state and users might be
wondering what "managed" means here.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4656350f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4656350f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4656350f

Branch: refs/heads/master
Commit: 4656350fc33d42ff96ad6d5e836e62172b4b0de6
Parents: 3e3a90d
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Nov 23 14:58:33 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Nov 23 16:13:29 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/state/ManagedInitializationContext.java    | 4 ++--
 .../flink/runtime/state/StateInitializationContextImpl.java  | 6 +++---
 .../streaming/connectors/fs/bucketing/BucketingSink.java     | 2 +-
 .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java   | 2 +-
 .../streaming/connectors/kafka/FlinkKafkaProducerBase.java   | 2 +-
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java         | 8 ++++----
 .../api/functions/source/ContinuousFileReaderOperator.java   | 2 +-
 .../streaming/api/operators/AbstractUdfStreamOperator.java   | 2 +-
 .../api/operators/StreamOperatorSnapshotRestoreTest.java     | 8 ++------
 .../org/apache/flink/test/checkpointing/RescalingITCase.java | 6 +++---
 10 files changed, 19 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
index abc528b..5255c43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
@@ -43,11 +43,11 @@ public interface ManagedInitializationContext {
        /**
         * Returns an interface that allows for registering operator state with 
the backend.
         */
-       OperatorStateStore getManagedOperatorStateStore();
+       OperatorStateStore getOperatorStateStore();
 
        /**
         * Returns an interface that allows for registering keyed state with 
the backend.
         */
-       KeyedStateStore getManagedKeyedStateStore();
+       KeyedStateStore getKeyedStateStore();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index b131d14..c86ff6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -122,12 +122,12 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
        }
 
        @Override
-       public OperatorStateStore getManagedOperatorStateStore() {
+       public OperatorStateStore getOperatorStateStore() {
                return operatorStateStore;
        }
 
        @Override
-       public KeyedStateStore getManagedKeyedStateStore() {
+       public KeyedStateStore getKeyedStateStore() {
                return keyedStateStore;
        }
 
@@ -268,4 +268,4 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                        throw new UnsupportedOperationException("Read only 
Iterator");
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 1da56b4..cf2c373 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -350,7 +350,7 @@ public class BucketingSink<T>
                        this.refTruncate = reflectTruncate(fs);
                }
 
-               OperatorStateStore stateStore = 
context.getManagedOperatorStateStore();
+               OperatorStateStore stateStore = context.getOperatorStateStore();
                restoredBucketStates = 
stateStore.getSerializableListState("bucket-states");
 
                int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 5161b35..aef7116 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -314,7 +314,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
-               OperatorStateStore stateStore = 
context.getManagedOperatorStateStore();
+               OperatorStateStore stateStore = context.getOperatorStateStore();
                offsetsStateForCheckpoint = 
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
                if (context.isRestored()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 33289f8..d413f1c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -343,7 +343,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
 
        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
-               this.stateStore = context.getManagedOperatorStateStore();
+               this.stateStore = context.getOperatorStateStore();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 9b7eabf..b96ba30 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -132,7 +132,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
 
-               
when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
                when(initializationContext.isRestored()).thenReturn(true);
 
                consumer.initializeState(initializationContext);
@@ -172,7 +172,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
 
-               
when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
                when(initializationContext.isRestored()).thenReturn(false);
 
                consumer.initializeState(initializationContext);
@@ -199,7 +199,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
 
-               
when(initializationContext.getManagedOperatorStateStore()).thenReturn(operatorStateStore);
+               
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
 
                // make the context signal that there is no restored state, 
then validate that
                when(initializationContext.isRestored()).thenReturn(false);
@@ -245,7 +245,7 @@ public class FlinkKafkaConsumerBaseTest {
 
                StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
 
-               
when(initializationContext.getManagedOperatorStateStore()).thenReturn(backend);
+               
when(initializationContext.getOperatorStateStore()).thenReturn(backend);
                when(initializationContext.isRestored()).thenReturn(false, 
true, true, true);
 
                consumer.initializeState(initializationContext);

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 74c58f9..bbe1ea5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -92,7 +92,7 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                checkState(this.checkpointedState == null && 
this.restoredReaderState == null,
                        "The reader state has already been initialized.");
 
-               checkpointedState = 
context.getManagedOperatorStateStore().getSerializableListState("splits");
+               checkpointedState = 
context.getOperatorStateStore().getSerializableListState("splits");
 
                int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
                if (context.isRestored()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 72ed5dc..c1f783f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -130,7 +130,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                        @SuppressWarnings("unchecked")
                        ListCheckpointed<Serializable> listCheckpointedFun = 
(ListCheckpointed<Serializable>) userFunction;
 
-                       ListState<Serializable> listState = 
context.getManagedOperatorStateStore().
+                       ListState<Serializable> listState = 
context.getOperatorStateStore().
                                        
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
                        List<Serializable> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index c02a7c3..08fbcbe 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -28,10 +28,8 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -39,13 +37,11 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.InputStream;
 import java.util.BitSet;
-import java.util.Collections;
 
 public class StreamOperatorSnapshotRestoreTest {
 
@@ -173,8 +169,8 @@ public class StreamOperatorSnapshotRestoreTest {
 
                        Assert.assertEquals(verifyRestore, 
context.isRestored());
 
-                       keyedState = 
context.getManagedKeyedStateStore().getState(new 
ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
-                       opState = 
context.getManagedOperatorStateStore().getSerializableListState("managed-op-state");
+                       keyedState = context.getKeyedStateStore().getState(new 
ValueStateDescriptor<>("managed-keyed", Integer.class, 0));
+                       opState = 
context.getOperatorStateStore().getSerializableListState("managed-op-state");
 
                        if (context.isRestored()) {
                                // check restored raw keyed state

http://git-wip-us.apache.org/repos/asf/flink/blob/4656350f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 09de67f..bc65abf 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -792,8 +792,8 @@ public class RescalingITCase extends TestLogger {
 
                @Override
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
-                       counter = 
context.getManagedKeyedStateStore().getState(new 
ValueStateDescriptor<>("counter", Integer.class, 0));
-                       sum = context.getManagedKeyedStateStore().getState(new 
ValueStateDescriptor<>("sum", Integer.class, 0));
+                       counter = context.getKeyedStateStore().getState(new 
ValueStateDescriptor<>("counter", Integer.class, 0));
+                       sum = context.getKeyedStateStore().getState(new 
ValueStateDescriptor<>("sum", Integer.class, 0));
                }
        }
 
@@ -937,7 +937,7 @@ public class RescalingITCase extends TestLogger {
                @Override
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
                        this.counterPartitions =
-                                       
context.getManagedOperatorStateStore().getSerializableListState("counter_partitions");
+                                       
context.getOperatorStateStore().getSerializableListState("counter_partitions");
                        if (context.isRestored()) {
                                for (int v : counterPartitions.get()) {
                                        counter += v;

Reply via email to