[FLINK-3312] Add accessors for various state types to RuntimeContext
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4d05f72 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4d05f72 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4d05f72 Branch: refs/heads/master Commit: e4d05f72e91f03e4f350cf42c31c5ec85fe86c53 Parents: 6f75596 Author: Stephan Ewen <[email protected]> Authored: Fri Jan 29 14:53:14 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 3 20:27:52 2016 +0100 ---------------------------------------------------------------------- .../state/DBStateCheckpointingTest.java | 6 +- .../api/common/functions/RuntimeContext.java | 117 ++++++++++++++++--- .../util/AbstractRuntimeUDFContext.java | 21 +++- .../kafka/testutils/MockRuntimeContext.java | 23 +++- .../api/operators/StreamingRuntimeContext.java | 39 +++++-- ...AlignedProcessingTimeWindowOperatorTest.java | 13 +-- ...AlignedProcessingTimeWindowOperatorTest.java | 8 +- .../api/scala/function/StatefulFunction.scala | 2 +- .../EventTimeWindowCheckpointingITCase.java | 2 +- .../PartitionedStateCheckpointingITCase.java | 6 +- .../StreamCheckpointingITCase.java | 2 +- 11 files changed, 187 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java index 3adf5aa..0afdada 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java @@ -209,7 +209,7 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; - sum = getRuntimeContext().getPartitionedState( + sum = getRuntimeContext().getState( new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE)); } @@ -237,11 +237,11 @@ public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { @Override public void open(Configuration parameters) throws IOException { - aCounts = getRuntimeContext().getPartitionedState( + aCounts = getRuntimeContext().getState( new ValueStateDescriptor<>("a", NonSerializableLong.of(0L), new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig()))); - bCounts = getRuntimeContext().getPartitionedState( + bCounts = getRuntimeContext().getState( new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE)); } http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index d37f7eb..92dd518 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -31,10 +31,13 @@ import org.apache.flink.api.common.accumulators.Histogram; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; /** @@ -188,13 +191,15 @@ public interface RuntimeContext { */ DistributedCache getDistributedCache(); - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Methods for accessing state + // ------------------------------------------------------------------------ /** - * Gets the partitioned state, which is only accessible if the function is executed on - * a KeyedStream. When interacting with the state only the instance bound to the key of the - * element currently processed by the function is changed. - * Each operator may maintain multiple partitioned states, addressed with different names. + * Gets a handle to the system's key/value state. The key/value state is only accessible + * if the function is executed on a KeyedStream. On each access, the state exposes the value + * for the the key of the element currently processed by the function. + * Each function may have multiple partitioned states, addressed with different names. * * <p>Because the scope of each value is the key of the currently processed element, * and the elements are distributed by the Flink runtime, the system can transparently @@ -213,32 +218,112 @@ public interface RuntimeContext { * private ValueState<Long> count; * * public void open(Configuration cfg) { - * state = getRuntimeContext().getPartitionedState( + * state = getRuntimeContext().getState( * new ValueStateDescriptor<Long>("count", 0L, LongSerializer.INSTANCE)); * } * * public Tuple2<MyType, Long> map(MyType value) { - * long count = state.value(); - * state.update(value + 1); + * long count = state.value() + 1; + * state.update(value); * return new Tuple2<>(value, count); * } * }); + * }</pre> + * + * @param stateProperties The descriptor defining the properties of the stats. + * + * @param <T> The type of value stored in the state. + * + * @return The partitioned state object. + * + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties); + + /** + * Gets a handle to the system's key/value list state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * holds lists. One can adds elements to the list, or retrieve the list as a whole. + * + * <p>This state is only accessible if the function is executed on a KeyedStream. + * + * <pre>{@code + * DataStream<MyType> stream = ...; + * KeyedStream<MyType> keyedStream = stream.keyBy("id"); + * + * keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() { + * + * private ListState<MyType> state; * + * public void open(Configuration cfg) { + * state = getRuntimeContext().getListState( + * new ListStateDescriptor<>("myState", MyType.class)); + * } + * + * public void flatMap(MyType value, Collector<MyType> out) { + * if (value.isDivider()) { + * for (MyType t : state.get()) { + * out.collect(t); + * } + * } else { + * state.add(value); + * } + * } + * }); * }</pre> * - * @param stateDescriptor The StateDescriptor that contains the name and type of the - * state that is being accessed. + * @param stateProperties The descriptor defining the properties of the stats. * - * @param <S> The type of the state. + * @param <T> The type of value stored in the state. * * @return The partitioned state object. * * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part os a KeyedStream). */ - <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor); + <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties); /** + * Gets a handle to the system's key/value list state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * aggregates values. + * + * <p>This state is only accessible if the function is executed on a KeyedStream. + * + * <pre>{@code + * DataStream<MyType> stream = ...; + * KeyedStream<MyType> keyedStream = stream.keyBy("id"); + * + * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { + * + * private ReducingState<Long> sum; + * + * public void open(Configuration cfg) { + * state = getRuntimeContext().getReducingState( + * new ReducingStateDescriptor<>("sum", MyType.class, 0L, (a, b) -> a + b)); + * } + * + * public Tuple2<MyType, Long> map(MyType value) { + * sum.add(value.count()); + * return new Tuple2<>(value, sum.get()); + * } + * }); + * + * }</pre> + * + * @param stateProperties The descriptor defining the properties of the stats. + * + * @param <T> The type of value stored in the state. + * + * @return The partitioned state object. + * + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties); + + /** * Gets the key/value state, which is only accessible if the function is executed on * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will * return the value bound to the key of the element currently processed by the function. @@ -292,7 +377,7 @@ public interface RuntimeContext { * @throws UnsupportedOperationException Thrown, if no key/value state is available for the * function (function is not part os a KeyedStream). * - * @deprecated Use the more expressive {@link #getPartitionedState(StateDescriptor)} instead. + * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead. */ @Deprecated <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); @@ -344,7 +429,7 @@ public interface RuntimeContext { * @throws UnsupportedOperationException Thrown, if no key/value state is available for the * function (function is not part os a KeyedStream). * - * @deprecated Use the more expressive {@link #getPartitionedState(StateDescriptor)} instead. + * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead. */ @Deprecated <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 45ef179..e0b53f6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -34,10 +34,13 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; @@ -172,10 +175,21 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { } @Override - public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) { + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); } @Override @@ -191,5 +205,4 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 73becd4..cd44236 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -26,10 +26,13 @@ import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -54,7 +57,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { this.indexOfThisSubtask = indexOfThisSubtask; } - private static class MockStreamOperator extends AbstractStreamOperator { + private static class MockStreamOperator extends AbstractStreamOperator<Integer> { private static final long serialVersionUID = -1153976702711944427L; @Override @@ -154,12 +157,22 @@ public class MockRuntimeContext extends StreamingRuntimeContext { } @Override - public <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { + public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { throw new UnsupportedOperationException(); } @Override - public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) { + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index ad89ea9..f99ab93 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -21,9 +21,12 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -106,11 +109,32 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { // ------------------------------------------------------------------------ @Override - public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) { + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); try { - return operator.getPartitionedState(stateDescriptor); + return operator.getPartitionedState(stateProperties); } catch (Exception e) { - throw new RuntimeException("Error while getting state.", e); + throw new RuntimeException("Error while getting state", e); + } + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + return operator.getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + return operator.getPartitionedState(stateProperties); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); } } @@ -138,8 +162,9 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { requireNonNull(name, "The name of the state must not be null"); requireNonNull(stateType, "The state type information must not be null"); - ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig())); - return getPartitionedState(stateDesc); + ValueStateDescriptor<S> stateProps = + new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig())); + return getState(stateProps); } // ------------------ expose (read only) relevant information from the stream config -------- // http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 70adadf..f2f8c5a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -21,7 +21,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; @@ -521,11 +522,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { finalResult.addAll(out2.getElements()); assertEquals(numElements, finalResult.size()); - synchronized (lock) { - op.close(); - } - op.dispose(); - Collections.sort(finalResult); for (int i = 0; i < numElements; i++) { assertEquals(i, finalResult.get(i).intValue()); @@ -761,12 +757,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // get "volatile" style access to entries static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>(); - private OperatorState<Integer> state; + private ValueState<Integer> state; @Override public void open(Configuration parameters) { assertNotNull(getRuntimeContext()); - state = getRuntimeContext().getKeyValueState("totalCount", Integer.class, 0); + state = getRuntimeContext().getState( + new ValueStateDescriptor<>("totalCount", 0, IntSerializer.INSTANCE)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 5c37f36..e88f6de 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -23,7 +23,8 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; @@ -934,7 +935,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>(); - private OperatorState<Integer> state; + private ValueState<Integer> state; @Override public void open(Configuration parameters) { @@ -942,7 +943,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { // start with one, so the final count is correct and we test that we do not // initialize with 0 always by default - state = getRuntimeContext().getKeyValueState("totalCount", Integer.class, 1); + state = getRuntimeContext().getState( + new ValueStateDescriptor<>("totalCount", 1, IntSerializer.INSTANCE)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala index 7ef93dd..5e1d1db 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala @@ -46,6 +46,6 @@ trait StatefulFunction[I, O, S] extends RichFunction { override def open(c: Configuration) = { val info = new ValueStateDescriptor[S]("state", null.asInstanceOf[S], stateSerializer) - state = getRuntimeContext().getPartitionedState[ValueState[S]](info) + state = getRuntimeContext().getState(info) } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 55293a3..19ff090 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -223,7 +223,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { public void open(Configuration parameters) { assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); open = true; - count = getRuntimeContext().getPartitionedState( + count = getRuntimeContext().getState( new ValueStateDescriptor<>("count", 0, IntSerializer.INSTANCE)); } http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java index 258ce49..a2d6c24 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java @@ -172,7 +172,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; - sum = getRuntimeContext().getPartitionedState( + sum = getRuntimeContext().getState( new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE)); } @@ -201,11 +201,11 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes @Override public void open(Configuration parameters) throws IOException { - aCounts = getRuntimeContext().getPartitionedState( + aCounts = getRuntimeContext().getState( new ValueStateDescriptor<>("a", NonSerializableLong.of(0L), new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig()))); - bCounts = getRuntimeContext().getPartitionedState( + bCounts = getRuntimeContext().getState( new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE)); } http://git-wip-us.apache.org/repos/asf/flink/blob/e4d05f72/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index a9f4389..d8131c7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -255,7 +255,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; - pCount = getRuntimeContext().getPartitionedState( + pCount = getRuntimeContext().getState( new ValueStateDescriptor<>("pCount", 0L, LongSerializer.INSTANCE)); }
