Repository: flink
Updated Branches:
  refs/heads/master 680c2c3ec -> 1198664cb


[FLINK-3201] Add operator state to make change backwards compatible


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f755961
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f755961
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f755961

Branch: refs/heads/master
Commit: 6f75596112ba5d36d88cc7c2de74acee8f683ca9
Parents: 524e56b
Author: Stephan Ewen <[email protected]>
Authored: Fri Jan 29 10:33:19 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 3 20:27:51 2016 +0100

----------------------------------------------------------------------
 .../state/DBStateCheckpointingTest.java         | 28 +++++++++++++-------
 .../api/common/functions/RuntimeContext.java    | 17 +++++++-----
 .../util/AbstractRuntimeUDFContext.java         |  5 ++--
 .../flink/api/common/state/OperatorState.java   | 23 ++++++++--------
 .../apache/flink/api/common/state/State.java    |  5 ++++
 .../flink/api/common/state/StateDescriptor.java |  2 +-
 .../kafka/testutils/MockRuntimeContext.java     |  3 ++-
 .../api/operators/StreamingRuntimeContext.java  |  6 ++---
 .../streaming/runtime/tasks/StreamTask.java     |  2 +-
 .../flink/streaming/api/scala/KeyedStream.scala | 10 ++++---
 .../api/scala/function/StatefulFunction.scala   | 13 +++++----
 .../EventTimeWindowCheckpointingITCase.java     |  9 ++++---
 .../PartitionedStateCheckpointingITCase.java    | 25 +++++++++--------
 .../StreamCheckpointingITCase.java              | 10 ++++---
 14 files changed, 97 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
index 87dabf8..3adf5aa 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java
@@ -32,9 +32,13 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.derby.drda.NetworkServerControl;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -46,6 +50,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import 
org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.IdentityKeySelector;
 import 
org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.NonSerializableLong;
 import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+
 import org.junit.After;
 import org.junit.Before;
 
@@ -182,7 +187,7 @@ public class DBStateCheckpointingTest extends 
StreamFaultToleranceTestBase {
 
        private static class OnceFailingPartitionedSum extends 
RichMapFunction<Integer, Tuple2<Integer, Long>> {
 
-               private static Map<Integer, Long> allSums = new 
ConcurrentHashMap<Integer, Long>();
+               private static Map<Integer, Long> allSums = new 
ConcurrentHashMap<>();
 
                private static volatile boolean hasFailed = false;
 
@@ -191,7 +196,7 @@ public class DBStateCheckpointingTest extends 
StreamFaultToleranceTestBase {
                private long failurePos;
                private long count;
 
-               private OperatorState<Long> sum;
+               private ValueState<Long> sum;
 
                OnceFailingPartitionedSum(long numElements) {
                        this.numElements = numElements;
@@ -204,7 +209,8 @@ public class DBStateCheckpointingTest extends 
StreamFaultToleranceTestBase {
 
                        failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
                        count = 0;
-                       sum = getRuntimeContext().getKeyValueState("my_state", 
Long.class, 0L);
+                       sum = getRuntimeContext().getPartitionedState(
+                                       new ValueStateDescriptor<>("my_state", 
0L, LongSerializer.INSTANCE));
                }
 
                @Override
@@ -224,15 +230,19 @@ public class DBStateCheckpointingTest extends 
StreamFaultToleranceTestBase {
 
        private static class CounterSink extends 
RichSinkFunction<Tuple2<Integer, Long>> {
 
-               private static Map<Integer, Long> allCounts = new 
ConcurrentHashMap<Integer, Long>();
+               private static Map<Integer, Long> allCounts = new 
ConcurrentHashMap<>();
 
-               private OperatorState<NonSerializableLong> aCounts;
-               private OperatorState<Long> bCounts;
+               private ValueState<NonSerializableLong> aCounts;
+               private ValueState<Long> bCounts;
 
                @Override
                public void open(Configuration parameters) throws IOException {
-                       aCounts = getRuntimeContext().getKeyValueState("a", 
NonSerializableLong.class, NonSerializableLong.of(0L));
-                       bCounts = getRuntimeContext().getKeyValueState("b", 
Long.class, 0L);
+                       aCounts = getRuntimeContext().getPartitionedState(
+                                       new ValueStateDescriptor<>("a", 
NonSerializableLong.of(0L), 
+                                                       new 
KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));
+                       
+                       bCounts = getRuntimeContext().getPartitionedState(
+                                       new ValueStateDescriptor<>("b", 0L, 
LongSerializer.INSTANCE));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index a419d1e..d37f7eb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -145,6 +146,7 @@ public interface RuntimeContext {
        /**
         * Convenience function to create a counter object for histograms.
         */
+       @Experimental
        Histogram getHistogram(String name);
        
        // 
--------------------------------------------------------------------------------------------
@@ -208,13 +210,11 @@ public interface RuntimeContext {
         *
         * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
         *
-        *     private ValueStateDescriptor<Long> countIdentifier =
-        *         new ValueStateDescriptor<>("count", 0L, 
LongSerializer.INSTANCE);
-        *
         *     private ValueState<Long> count;
         *
         *     public void open(Configuration cfg) {
-        *         state = 
getRuntimeContext().getPartitionedState(countIdentifier);
+        *         state = getRuntimeContext().getPartitionedState(
+        *                 new ValueStateDescriptor<Long>("count", 0L, 
LongSerializer.INSTANCE));
         *     }
         *
         *     public Tuple2<MyType, Long> map(MyType value) {
@@ -291,9 +291,11 @@ public interface RuntimeContext {
         *
         * @throws UnsupportedOperationException Thrown, if no key/value state 
is available for the
         *                                       function (function is not part 
os a KeyedStream).
+        * 
+        * @deprecated Use the more expressive {@link 
#getPartitionedState(StateDescriptor)} instead.
         */
        @Deprecated
-       <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S 
defaultState);
+       <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, 
S defaultState);
 
        /**
         * Gets the key/value state, which is only accessible if the function 
is executed on
@@ -330,7 +332,6 @@ public interface RuntimeContext {
         *     
         * }</pre>
         * 
-        *
         * @param name The name of the key/value state.
         * @param stateType The type information for the type that is stored in 
the state.
         *                  Used to create serializers for managed memory and 
checkpoints.
@@ -342,7 +343,9 @@ public interface RuntimeContext {
         * 
         * @throws UnsupportedOperationException Thrown, if no key/value state 
is available for the
         *                                       function (function is not part 
os a KeyedStream).
+        * 
+        * @deprecated Use the more expressive {@link 
#getPartitionedState(StateDescriptor)} instead.
         */
        @Deprecated
-       <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> 
stateType, S defaultState);
+       <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> 
stateType, S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index fe18994..45ef179 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -179,14 +180,14 @@ public abstract class AbstractRuntimeUDFContext 
implements RuntimeContext {
 
        @Override
        @Deprecated
-       public <S> ValueState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
+       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }
 
        @Override
        @Deprecated
-       public <S> ValueState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
+       public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 32ffa7f..db563a0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -18,25 +18,24 @@
 
 package org.apache.flink.api.common.state;
 
-import org.apache.flink.annotation.Public;
-
 import java.io.IOException;
 
 /**
  * This state interface abstracts persistent key/value state in streaming 
programs.
  * The state is accessed and modified by user functions, and checkpointed 
consistently
  * by the system as part of the distributed snapshots.
- * 
+ *
  * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
  * automatically supplied by the system, so the function always sees the value 
mapped to the
  * key of the current element. That way, the system can handle stream and 
state partitioning
  * consistently together.
- * 
+ *
  * @param <T> Type of the value in the operator state
+ * 
+ * @deprecated OperatorState has been replaced by {@link ValueState}.
  */
-@Public
 @Deprecated
-public interface OperatorState<T> {
+public interface OperatorState<T> extends State {
 
        /**
         * Returns the current value for the state. When the state is not
@@ -44,9 +43,9 @@ public interface OperatorState<T> {
         * operator instance. If state partitioning is applied, the value 
returned
         * depends on the current operator input, as the operator maintains an
         * independent state for each partition.
-        * 
+        *
         * @return The operator state value corresponding to the current input.
-        * 
+        *
         * @throws IOException Thrown if the system cannot access the state.
         */
        T value() throws IOException;
@@ -57,12 +56,12 @@ public interface OperatorState<T> {
         * partition) the returned state will represent the updated value. When 
a
         * partitioned state is updated with null, the state for the current 
key 
         * will be removed and the default value is returned on the next access.
-        * 
+        *
         * @param value
         *            The new value for the state.
-        *            
+        *
         * @throws IOException Thrown if the system cannot access the state.
         */
        void update(T value) throws IOException;
-       
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
index 255a735..5a7650e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.common.state;
 
 /**
@@ -26,5 +27,9 @@ package org.apache.flink.api.common.state;
  * consistently together.
  */
 public interface State {
+
+       /**
+        * Removes the value mapped under the current key.
+        */
        void clear();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index f62118d..737133f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -41,7 +41,7 @@ public abstract class StateDescriptor<S extends State> 
implements Serializable {
         * @param name The name of the {@code StateDescriptor}.
         */
        public StateDescriptor(String name) {
-               this.name = requireNonNull(name);;
+               this.name = requireNonNull(name);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index ee246bb..73becd4 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -148,7 +149,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        }
 
        @Override
-       public <S> ValueState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
+       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
                throw new UnsupportedOperationException();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index dda92bc..ad89ea9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
-import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -116,7 +116,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
 
        @Override
        @Deprecated
-       public <S> ValueState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
+       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
                requireNonNull(stateType, "The state type class must not be 
null");
 
                TypeInformation<S> typeInfo;
@@ -134,7 +134,7 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
 
        @Override
        @Deprecated
-       public <S> ValueState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
+       public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
                requireNonNull(name, "The name of the state must not be null");
                requireNonNull(stateType, "The state type information must not 
be null");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index cb6a468..4e75b1c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -494,7 +494,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        // start a Thread that does the 
asynchronous materialization and
                                        // then sends the checkpoint acknowledge
 
-                                       String threadName = "Materialize 
checkpoint " + checkpointId + " for " + getName();
+                                       String threadName = "Materialize 
checkpoint state " + checkpointId + " - " + getName();
                                        Thread checkpointThread = new 
Thread(threadName) {
                                                @Override
                                                public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 59c5693..136716d 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, 
KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import 
org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, 
SumAggregator}
@@ -298,10 +299,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
     val cleanFun = clean(fun)
     val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+    val serializer: TypeSerializer[S] = 
stateTypeInfo.createSerializer(getExecutionConfig)
 
     val filterFun = new RichFilterFunction[T] with StatefulFunction[T, 
Boolean, S] {
 
-      override val stateType: TypeInformation[S] = stateTypeInfo
+      override val stateSerializer: TypeSerializer[S] = serializer
 
       override def filter(in: T): Boolean = {
         applyWithState(in, cleanFun)
@@ -326,10 +328,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
     val cleanFun = clean(fun)
     val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+    val serializer: TypeSerializer[S] = 
stateTypeInfo.createSerializer(getExecutionConfig)
     
     val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {
 
-      override val stateType: TypeInformation[S] = stateTypeInfo
+      override val stateSerializer: TypeSerializer[S] = serializer
       
       override def map(in: T): R = {
         applyWithState(in, cleanFun)
@@ -354,10 +357,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, 
K]) extends DataStream[T]
 
     val cleanFun = clean(fun)
     val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
+    val serializer: TypeSerializer[S] = 
stateTypeInfo.createSerializer(getExecutionConfig)
     
     val flatMapper = new RichFlatMapFunction[T, R] with 
StatefulFunction[T,TraversableOnce[R],S]{
 
-      override val stateType: TypeInformation[S] = stateTypeInfo
+      override val stateSerializer: TypeSerializer[S] = serializer
       
       override def flatMap(in: T, out: Collector[R]): Unit = {
         applyWithState(in, cleanFun) foreach out.collect

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
index dc49173..7ef93dd 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.api.scala.function
 
 import org.apache.flink.api.common.functions.RichFunction
-import org.apache.flink.api.common.state.OperatorState
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.state.{ValueStateDescriptor, ValueState}
+import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.configuration.Configuration
 
 /**
@@ -29,9 +29,11 @@ import org.apache.flink.configuration.Configuration
  * call the applyWithState method in his own RichFunction implementation.
  */
 trait StatefulFunction[I, O, S] extends RichFunction {
+
+  protected val stateSerializer: TypeSerializer[S]
+  
+  private[this] var state: ValueState[S] = _
   
-  var state: OperatorState[S] = _
-  val stateType: TypeInformation[S]
 
   def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
     val (o, s: Option[S]) = fun(in, Option(state.value()))
@@ -43,6 +45,7 @@ trait StatefulFunction[I, O, S] extends RichFunction {
   }
 
   override def open(c: Configuration) = {
-    state = getRuntimeContext().getKeyValueState[S]("state", stateType, 
null.asInstanceOf[S])
+    val info = new ValueStateDescriptor[S]("state", null.asInstanceOf[S], 
stateSerializer)
+    state = getRuntimeContext().getPartitionedState[ValueState[S]](info)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 9bc0040..55293a3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -19,7 +19,9 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -215,13 +217,14 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
 
                                                private boolean open = false;
 
-                                               private OperatorState<Integer> 
count;
+                                               private ValueState<Integer> 
count;
 
                                                @Override
                                                public void open(Configuration 
parameters) {
                                                        
assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
                                                        open = true;
-                                                       count = 
getRuntimeContext().getKeyValueState("count", Integer.class, 0);
+                                                       count = 
getRuntimeContext().getPartitionedState(
+                                                                       new 
ValueStateDescriptor<>("count", 0, IntSerializer.INSTANCE));
                                                }
 
                                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 387421e..258ce49 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -27,13 +27,14 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -156,7 +157,7 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
                private long failurePos;
                private long count;
 
-               private OperatorState<Long> sum;
+               private ValueState<Long> sum;
 
                OnceFailingPartitionedSum(long numElements) {
                        this.numElements = numElements;
@@ -171,7 +172,8 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
 
                        failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
                        count = 0;
-                       sum = getRuntimeContext().getKeyValueState("my_state", 
Long.class, 0L);
+                       sum = getRuntimeContext().getPartitionedState(
+                                       new ValueStateDescriptor<>("my_state", 
0L, LongSerializer.INSTANCE));
                }
 
                @Override
@@ -192,18 +194,19 @@ public class PartitionedStateCheckpointingITCase extends 
StreamFaultToleranceTes
        private static class CounterSink extends 
RichSinkFunction<Tuple2<Integer, Long>> {
 
                private static Map<Integer, Long> allCounts = new 
ConcurrentHashMap<Integer, Long>();
-
-               private ValueStateDescriptor<Long> bCountsId = new 
ValueStateDescriptor<>("b", 0L,
-                               LongSerializer.INSTANCE);
-
-               private OperatorState<NonSerializableLong> aCounts;
+               
+               private ValueState<NonSerializableLong> aCounts;
                private ValueState<Long> bCounts;
 
                @Override
                public void open(Configuration parameters) throws IOException {
-                       aCounts = getRuntimeContext().getKeyValueState(
-                                       "a", NonSerializableLong.class, 
NonSerializableLong.of(0L));
-                       bCounts = 
getRuntimeContext().getPartitionedState(bCountsId);
+                       
+                       aCounts = getRuntimeContext().getPartitionedState(
+                                       new ValueStateDescriptor<>("a", 
NonSerializableLong.of(0L), 
+                                                       new 
KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));
+                       
+                       bCounts = getRuntimeContext().getPartitionedState(
+                                       new ValueStateDescriptor<>("b", 0L, 
LongSerializer.INSTANCE));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index e98696e..a9f4389 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -20,7 +20,9 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -238,7 +240,7 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
                private long failurePos;
                private long count;
                
-               private OperatorState<Long> pCount;
+               private ValueState<Long> pCount;
                private long inputCount;
 
                OnceFailingPrefixCounter(long numElements) {
@@ -252,7 +254,9 @@ public class StreamCheckpointingITCase extends 
StreamFaultToleranceTestBase {
 
                        failurePos = (new Random().nextLong() % (failurePosMax 
- failurePosMin)) + failurePosMin;
                        count = 0;
-                       pCount = getRuntimeContext().getKeyValueState("pCount", 
Long.class, 0L);
+                       
+                       pCount = getRuntimeContext().getPartitionedState(
+                                       new ValueStateDescriptor<>("pCount", 
0L, LongSerializer.INSTANCE));
                }
                
                @Override

Reply via email to