Repository: flink
Updated Branches:
  refs/heads/master 8b1b4a1cc -> 6a86e9d62


Hide broadcast state / remove from public API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a86e9d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a86e9d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a86e9d6

Branch: refs/heads/master
Commit: 6a86e9d62045386b76026f4deead8baa559f008e
Parents: 1020ba2
Author: Stefan Richter <[email protected]>
Authored: Fri Jan 13 16:38:33 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Jan 13 21:29:19 2017 +0100

----------------------------------------------------------------------
 .../api/common/state/OperatorStateStore.java    | 27 --------------------
 .../state/DefaultOperatorStateBackend.java      |  2 --
 .../runtime/state/OperatorStateBackendTest.java | 12 +++++----
 .../test/checkpointing/RescalingITCase.java     |  5 +++-
 4 files changed, 11 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6a86e9d6/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index 87a7759..c1cdfe4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -57,33 +57,6 @@ public interface OperatorStateStore {
        <T extends Serializable> ListState<T> getSerializableListState(String 
stateName) throws Exception;
 
        /**
-        * Creates (or restores) a list state. Each state is registered under a 
unique name.
-        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
-        *
-        * On restore, all items in the list are broadcasted to all parallel 
operator instances.
-        *
-        * @param stateDescriptor The descriptor for this state, providing a 
name and serializer.
-        * @param <S> The generic type of the state
-        *
-        * @return A list for all state partitions.
-        * @throws Exception
-        */
-       <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> 
stateDescriptor) throws Exception;
-
-       /**
-        * Creates a state of the given name that uses Java serialization to 
persist the state. On restore, all items
-        * in the list are broadcasted to all parallel operator instances.
-        *
-        * <p>This is a simple convenience method. For more flexibility on how 
state serialization
-        * should happen, use the {@link 
#getBroadcastOperatorState(ListStateDescriptor)} method.
-        *
-        * @param stateName The name of state to create
-        * @return A list state using Java serialization to serialize state 
objects.
-        * @throws Exception
-        */
-       <T extends Serializable> ListState<T> 
getBroadcastSerializableListState(String stateName) throws Exception;
-
-       /**
         * Returns a set with the names of all currently registered states.
         * @return set of names for all registered states.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/6a86e9d6/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 6c65088..1cd1da7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -91,12 +91,10 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        }
 
        @SuppressWarnings("unchecked")
-       @Override
        public <T extends Serializable> ListState<T> 
getBroadcastSerializableListState(String stateName) throws Exception {
                return (ListState<T>) getBroadcastOperatorState(new 
ListStateDescriptor<>(stateName, javaSerializer));
        }
 
-       @Override
        public <S> ListState<S> 
getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws 
Exception {
                return getOperatorState(stateDescriptor, 
OperatorStateHandle.Mode.BROADCAST);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6a86e9d6/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index cd0391f..5bd085f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -45,8 +45,9 @@ public class OperatorStateBackendTest {
                return env;
        }
 
-       private OperatorStateBackend createNewOperatorStateBackend() throws 
Exception {
-               return abstractStateBackend.createOperatorStateBackend(
+       private DefaultOperatorStateBackend createNewOperatorStateBackend() 
throws Exception {
+               //TODO this is temporarily casted to test already functionality 
that we do not yet expose through public API
+               return (DefaultOperatorStateBackend) 
abstractStateBackend.createOperatorStateBackend(
                                createMockEnvironment(),
                                "test-operator");
        }
@@ -60,7 +61,7 @@ public class OperatorStateBackendTest {
 
        @Test
        public void testRegisterStates() throws Exception {
-               OperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -143,7 +144,7 @@ public class OperatorStateBackendTest {
 
        @Test
        public void testSnapshotRestore() throws Exception {
-               OperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
+               DefaultOperatorStateBackend operatorStateBackend = 
createNewOperatorStateBackend();
                ListStateDescriptor<Serializable> stateDescriptor1 = new 
ListStateDescriptor<>("test1", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor2 = new 
ListStateDescriptor<>("test2", new JavaSerializer<>());
                ListStateDescriptor<Serializable> stateDescriptor3 = new 
ListStateDescriptor<>("test3", new JavaSerializer<>());
@@ -171,7 +172,8 @@ public class OperatorStateBackendTest {
                        operatorStateBackend.close();
                        operatorStateBackend.dispose();
 
-                       operatorStateBackend = 
abstractStateBackend.createOperatorStateBackend(
+                       //TODO this is temporarily casted to test already 
functionality that we do not yet expose through public API
+                       operatorStateBackend = (DefaultOperatorStateBackend) 
abstractStateBackend.createOperatorStateBackend(
                                        createMockEnvironment(),
                                        "testOperator");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6a86e9d6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 45fcc25..bd1678e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -969,8 +970,10 @@ public class RescalingITCase extends TestLogger {
                public void initializeState(FunctionInitializationContext 
context) throws Exception {
 
                        if (broadcast) {
+                               //TODO this is temporarily casted to test 
already functionality that we do not yet expose through public API
+                               DefaultOperatorStateBackend operatorStateStore 
= (DefaultOperatorStateBackend) context.getOperatorStateStore();
                                this.counterPartitions =
-                                               
context.getOperatorStateStore().getBroadcastSerializableListState("counter_partitions");
+                                               
operatorStateStore.getBroadcastSerializableListState("counter_partitions");
                        } else {
                                this.counterPartitions =
                                                
context.getOperatorStateStore().getSerializableListState("counter_partitions");

Reply via email to