Repository: flink Updated Branches: refs/heads/master 680c2c3ec -> 1198664cb
[FLINK-3201] Add operator state to make change backwards compatible Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f755961 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f755961 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f755961 Branch: refs/heads/master Commit: 6f75596112ba5d36d88cc7c2de74acee8f683ca9 Parents: 524e56b Author: Stephan Ewen <[email protected]> Authored: Fri Jan 29 10:33:19 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 3 20:27:51 2016 +0100 ---------------------------------------------------------------------- .../state/DBStateCheckpointingTest.java | 28 +++++++++++++------- .../api/common/functions/RuntimeContext.java | 17 +++++++----- .../util/AbstractRuntimeUDFContext.java | 5 ++-- .../flink/api/common/state/OperatorState.java | 23 ++++++++-------- .../apache/flink/api/common/state/State.java | 5 ++++ .../flink/api/common/state/StateDescriptor.java | 2 +- .../kafka/testutils/MockRuntimeContext.java | 3 ++- .../api/operators/StreamingRuntimeContext.java | 6 ++--- .../streaming/runtime/tasks/StreamTask.java | 2 +- .../flink/streaming/api/scala/KeyedStream.scala | 10 ++++--- .../api/scala/function/StatefulFunction.scala | 13 +++++---- .../EventTimeWindowCheckpointingITCase.java | 9 ++++--- .../PartitionedStateCheckpointingITCase.java | 25 +++++++++-------- .../StreamCheckpointingITCase.java | 10 ++++--- 14 files changed, 97 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java index 87dabf8..3adf5aa 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java @@ -32,9 +32,13 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.FileUtils; import org.apache.derby.drda.NetworkServerControl; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -46,6 +50,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.IdentityKeySelector; import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.NonSerializableLong; import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; + import org.junit.After; import org.junit.Before; @@ -182,7 +187,7 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> { - private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>(); + private static Map<Integer, Long> allSums = new ConcurrentHashMap<>(); private static volatile boolean hasFailed = false; @@ -191,7 +196,7 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { private long failurePos; private long count; - private OperatorState<Long> sum; + private ValueState<Long> sum; OnceFailingPartitionedSum(long numElements) { this.numElements = numElements; @@ -204,7 +209,8 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; - sum = getRuntimeContext().getKeyValueState("my_state", Long.class, 0L); + sum = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE)); } @Override @@ -224,15 +230,19 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> { - private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>(); + private static Map<Integer, Long> allCounts = new ConcurrentHashMap<>(); - private OperatorState<NonSerializableLong> aCounts; - private OperatorState<Long> bCounts; + private ValueState<NonSerializableLong> aCounts; + private ValueState<Long> bCounts; @Override public void open(Configuration parameters) throws IOException { - aCounts = getRuntimeContext().getKeyValueState("a", NonSerializableLong.class, NonSerializableLong.of(0L)); - bCounts = getRuntimeContext().getKeyValueState("b", Long.class, 0L); + aCounts = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("a", NonSerializableLong.of(0L), + new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig()))); + + bCounts = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index a419d1e..d37f7eb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.accumulators.Histogram; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; @@ -145,6 +146,7 @@ public interface RuntimeContext { /** * Convenience function to create a counter object for histograms. */ + @Experimental Histogram getHistogram(String name); // -------------------------------------------------------------------------------------------- @@ -208,13 +210,11 @@ public interface RuntimeContext { * * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() { * - * private ValueStateDescriptor<Long> countIdentifier = - * new ValueStateDescriptor<>("count", 0L, LongSerializer.INSTANCE); - * * private ValueState<Long> count; * * public void open(Configuration cfg) { - * state = getRuntimeContext().getPartitionedState(countIdentifier); + * state = getRuntimeContext().getPartitionedState( + * new ValueStateDescriptor<Long>("count", 0L, LongSerializer.INSTANCE)); * } * * public Tuple2<MyType, Long> map(MyType value) { @@ -291,9 +291,11 @@ public interface RuntimeContext { * * @throws UnsupportedOperationException Thrown, if no key/value state is available for the * function (function is not part os a KeyedStream). + * + * @deprecated Use the more expressive {@link #getPartitionedState(StateDescriptor)} instead. */ @Deprecated - <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); + <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); /** * Gets the key/value state, which is only accessible if the function is executed on @@ -330,7 +332,6 @@ public interface RuntimeContext { * * }</pre> * - * * @param name The name of the key/value state. * @param stateType The type information for the type that is stored in the state. * Used to create serializers for managed memory and checkpoints. @@ -342,7 +343,9 @@ public interface RuntimeContext { * * @throws UnsupportedOperationException Thrown, if no key/value state is available for the * function (function is not part os a KeyedStream). + * + * @deprecated Use the more expressive {@link #getPartitionedState(StateDescriptor)} instead. */ @Deprecated - <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); + <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); } http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index fe18994..45ef179 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; @@ -179,14 +180,14 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { @Override @Deprecated - public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { + public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); } @Override @Deprecated - public <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { + public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); } http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java index 32ffa7f..db563a0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java @@ -18,25 +18,24 @@ package org.apache.flink.api.common.state; -import org.apache.flink.annotation.Public; - import java.io.IOException; /** * This state interface abstracts persistent key/value state in streaming programs. * The state is accessed and modified by user functions, and checkpointed consistently * by the system as part of the distributed snapshots. - * + * * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. - * + * * @param <T> Type of the value in the operator state + * + * @deprecated OperatorState has been replaced by {@link ValueState}. */ -@Public @Deprecated -public interface OperatorState<T> { +public interface OperatorState<T> extends State { /** * Returns the current value for the state. When the state is not @@ -44,9 +43,9 @@ public interface OperatorState<T> { * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. - * + * * @return The operator state value corresponding to the current input. - * + * * @throws IOException Thrown if the system cannot access the state. */ T value() throws IOException; @@ -57,12 +56,12 @@ public interface OperatorState<T> { * partition) the returned state will represent the updated value. When a * partitioned state is updated with null, the state for the current key * will be removed and the default value is returned on the next access. - * + * * @param value * The new value for the state. - * + * * @throws IOException Thrown if the system cannot access the state. */ void update(T value) throws IOException; - -} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/state/State.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java index 255a735..5a7650e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.common.state; /** @@ -26,5 +27,9 @@ package org.apache.flink.api.common.state; * consistently together. */ public interface State { + + /** + * Removes the value mapped under the current key. + */ void clear(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index f62118d..737133f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -41,7 +41,7 @@ public abstract class StateDescriptor<S extends State> implements Serializable { * @param name The name of the {@code StateDescriptor}. */ public StateDescriptor(String name) { - this.name = requireNonNull(name);; + this.name = requireNonNull(name); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index ee246bb..73becd4 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -148,7 +149,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { } @Override - public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { + public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index dda92bc..ad89ea9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext; -import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; @@ -116,7 +116,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { @Override @Deprecated - public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { + public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { requireNonNull(stateType, "The state type class must not be null"); TypeInformation<S> typeInfo; @@ -134,7 +134,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { @Override @Deprecated - public <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { + public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { requireNonNull(name, "The name of the state must not be null"); requireNonNull(stateType, "The state type information must not be null"); http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index cb6a468..4e75b1c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -494,7 +494,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // start a Thread that does the asynchronous materialization and // then sends the checkpoint acknowledge - String threadName = "Materialize checkpoint " + checkpointId + " for " + getName(); + String threadName = "Materialize checkpoint state " + checkpointId + " - " + getName(); Thread checkpointThread = new Thread(threadName) { @Override public void run() { http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 59c5693..136716d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} @@ -298,10 +299,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] val cleanFun = clean(fun) val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] + val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(getExecutionConfig) val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] { - override val stateType: TypeInformation[S] = stateTypeInfo + override val stateSerializer: TypeSerializer[S] = serializer override def filter(in: T): Boolean = { applyWithState(in, cleanFun) @@ -326,10 +328,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] val cleanFun = clean(fun) val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] + val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(getExecutionConfig) val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] { - override val stateType: TypeInformation[S] = stateTypeInfo + override val stateSerializer: TypeSerializer[S] = serializer override def map(in: T): R = { applyWithState(in, cleanFun) @@ -354,10 +357,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] val cleanFun = clean(fun) val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] + val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(getExecutionConfig) val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{ - override val stateType: TypeInformation[S] = stateTypeInfo + override val stateSerializer: TypeSerializer[S] = serializer override def flatMap(in: T, out: Collector[R]): Unit = { applyWithState(in, cleanFun) foreach out.collect http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala index dc49173..7ef93dd 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala @@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.scala.function import org.apache.flink.api.common.functions.RichFunction -import org.apache.flink.api.common.state.OperatorState -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.state.{ValueStateDescriptor, ValueState} +import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.configuration.Configuration /** @@ -29,9 +29,11 @@ import org.apache.flink.configuration.Configuration * call the applyWithState method in his own RichFunction implementation. */ trait StatefulFunction[I, O, S] extends RichFunction { + + protected val stateSerializer: TypeSerializer[S] + + private[this] var state: ValueState[S] = _ - var state: OperatorState[S] = _ - val stateType: TypeInformation[S] def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = { val (o, s: Option[S]) = fun(in, Option(state.value())) @@ -43,6 +45,7 @@ trait StatefulFunction[I, O, S] extends RichFunction { } override def open(c: Configuration) = { - state = getRuntimeContext().getKeyValueState[S]("state", stateType, null.asInstanceOf[S]) + val info = new ValueStateDescriptor[S]("state", null.asInstanceOf[S], stateSerializer) + state = getRuntimeContext().getPartitionedState[ValueState[S]](info) } } http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 9bc0040..55293a3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -19,7 +19,9 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; @@ -215,13 +217,14 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { private boolean open = false; - private OperatorState<Integer> count; + private ValueState<Integer> count; @Override public void open(Configuration parameters) { assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); open = true; - count = getRuntimeContext().getKeyValueState("count", Integer.class, 0); + count = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("count", 0, IntSerializer.INSTANCE)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java index 387421e..258ce49 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java @@ -27,13 +27,14 @@ import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -156,7 +157,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes private long failurePos; private long count; - private OperatorState<Long> sum; + private ValueState<Long> sum; OnceFailingPartitionedSum(long numElements) { this.numElements = numElements; @@ -171,7 +172,8 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; - sum = getRuntimeContext().getKeyValueState("my_state", Long.class, 0L); + sum = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE)); } @Override @@ -192,18 +194,19 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> { private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>(); - - private ValueStateDescriptor<Long> bCountsId = new ValueStateDescriptor<>("b", 0L, - LongSerializer.INSTANCE); - - private OperatorState<NonSerializableLong> aCounts; + + private ValueState<NonSerializableLong> aCounts; private ValueState<Long> bCounts; @Override public void open(Configuration parameters) throws IOException { - aCounts = getRuntimeContext().getKeyValueState( - "a", NonSerializableLong.class, NonSerializableLong.of(0L)); - bCounts = getRuntimeContext().getPartitionedState(bCountsId); + + aCounts = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("a", NonSerializableLong.of(0L), + new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig()))); + + bCounts = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6f755961/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index e98696e..a9f4389 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -20,7 +20,9 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -238,7 +240,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { private long failurePos; private long count; - private OperatorState<Long> pCount; + private ValueState<Long> pCount; private long inputCount; OnceFailingPrefixCounter(long numElements) { @@ -252,7 +254,9 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; - pCount = getRuntimeContext().getKeyValueState("pCount", Long.class, 0L); + + pCount = getRuntimeContext().getPartitionedState( + new ValueStateDescriptor<>("pCount", 0L, LongSerializer.INSTANCE)); } @Override
