Repository: flink Updated Branches: refs/heads/master 0e21941e3 -> 0d2c49005
[FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..) Closes #890 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2c4900 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2c4900 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2c4900 Branch: refs/heads/master Commit: 0d2c49005449d6e05bbf53446edac928bb7ecbb6 Parents: 0e21941 Author: Gyula Fora <[email protected]> Authored: Tue Jul 7 11:07:05 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Tue Jul 7 12:42:29 2015 +0200 ---------------------------------------------------------------------- docs/apis/streaming_guide.md | 6 +++--- .../api/common/functions/RuntimeContext.java | 8 +++---- .../flink/api/common/state/OperatorState.java | 22 ++++++++++---------- .../api/persistent/PersistentKafkaSource.java | 16 +++++++------- .../source/StatefulSequenceSource.java | 6 +++--- .../state/PartitionedStreamOperatorState.java | 4 ++-- .../api/state/StreamOperatorState.java | 12 +++++------ .../api/state/StatefulOperatorTest.java | 22 ++++++++++---------- .../StreamCheckpointingITCase.java | 16 +++++++------- .../ProcessFailureStreamingRecoveryITCase.java | 6 +++--- 10 files changed, 59 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/docs/apis/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index c612b69..7d8ab6d 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -1220,7 +1220,7 @@ Flink supports the checkpointing and persistence of user defined operator states Flink supports two types of operator states: partitioned and non-partitioned states. -In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper. +In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.value()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `value()` will return number of inputs processed by each parallel mapper. In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams. @@ -1244,7 +1244,7 @@ public class CounterSum implements RichReduceFunction<Long> { @Override public Long reduce(Long value1, Long value2) throws Exception { - counter.updateState(counter.getState() + 1); + counter.update(counter.value() + 1); return value1 + value2; } @@ -1275,7 +1275,7 @@ public static class CounterSource implements RichParallelSourceFunction<Long> { // output and state update are atomic synchronized (lock){ ctx.collect(offset); - offset.updateState(offset.getState() + 1); + offset.update(offset.value() + 1); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 4c8e924..eb84d1c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -184,9 +184,9 @@ public interface RuntimeContext { * can be used by the same operator. * @param defaultState * Default value for the operator state. This will be returned - * the first time {@link OperatorState#getState()} (for every + * the first time {@link OperatorState#value()} (for every * state partition) is called before - * {@link OperatorState#updateState(Object)}. + * {@link OperatorState#update(Object)}. * @param partitioned * Sets whether partitioning should be applied for the given * state. If true a partitioner key must be used. @@ -216,9 +216,9 @@ public interface RuntimeContext { * can be used by the same operator. * @param defaultState * Default value for the operator state. This will be returned - * the first time {@link OperatorState#getState()} (for every + * the first time {@link OperatorState#value()} (for every * state partition) is called before - * {@link OperatorState#updateState(Object)}. + * {@link OperatorState#update(Object)}. * @param partitioned * Sets whether partitioning should be applied for the given * state. If true a partitioner key must be used. http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java index 4198a50..955b35b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java @@ -27,8 +27,8 @@ import org.apache.flink.api.common.functions.MapFunction; * partitioned (when state partitioning is defined in the program) or * non-partitioned user states. * - * State can be accessed and manipulated using the {@link #getState()} and - * {@link #updateState(T)} methods. These calls are only safe in the + * State can be accessed and manipulated using the {@link #value()} and + * {@link #update(T)} methods. These calls are only safe in the * transformation call the operator represents, for instance inside * {@link MapFunction#map()} and can lead tp unexpected behavior in the * {@link #open(org.apache.flink.configuration.Configuration)} or @@ -40,28 +40,28 @@ import org.apache.flink.api.common.functions.MapFunction; public interface OperatorState<T> { /** - * Gets the current state for the operator. When the state is not - * partitioned the returned state is the same for all inputs in a given - * operator instance. If state partitioning is applied, the state returned + * Returns the current value for the state. When the state is not + * partitioned the returned value is the same for all inputs in a given + * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. * - * @return The operator state corresponding to the current input. + * @return The operator state value corresponding to the current input. * * @throws IOException Thrown if the system cannot access the state. */ - T getState() throws IOException; + T value() throws IOException; /** - * Updates the operator state accessible by {@link #getState()} to the given - * value. The next time {@link #getState()} is called (for the same state + * Updates the operator state accessible by {@link #value()} to the given + * value. The next time {@link #value()} is called (for the same state * partition) the returned state will represent the updated value. * * @param state - * The new state. + * The new value for the state. * * @throws IOException Thrown if the system cannot access the state. */ - void updateState(T state) throws IOException; + void update(T value) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java index befbef6..6758f2c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java @@ -148,17 +148,17 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> this.lastOffsets = getRuntimeContext().getOperatorState("offset", new long[numPartitions], false); this.commitedOffsets = new long[numPartitions]; // check if there are offsets to restore - if (!Arrays.equals(lastOffsets.getState(), new long[numPartitions])) { - if (lastOffsets.getState().length != numPartitions) { - throw new IllegalStateException("There are "+lastOffsets.getState().length+" offsets to restore for topic "+topicName+" but " + + if (!Arrays.equals(lastOffsets.value(), new long[numPartitions])) { + if (lastOffsets.value().length != numPartitions) { + throw new IllegalStateException("There are "+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " + "there are only "+numPartitions+" in the topic"); } - LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.getState())); - setOffsetsInZooKeeper(lastOffsets.getState()); + LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.value())); + setOffsetsInZooKeeper(lastOffsets.value()); } else { // initialize empty offsets - Arrays.fill(this.lastOffsets.getState(), -1); + Arrays.fill(this.lastOffsets.value(), -1); } Arrays.fill(this.commitedOffsets, 0); // just to make it clear @@ -175,7 +175,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> while (running && iteratorToRead.hasNext()) { MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next(); - if(lastOffsets.getState()[message.partition()] >= message.offset()) { + if(lastOffsets.value()[message.partition()] >= message.offset()) { LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition()); continue; } @@ -188,7 +188,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> // make the state update and the element emission atomic synchronized (checkpointLock) { - lastOffsets.getState()[message.partition()] = message.offset(); + lastOffsets.value()[message.partition()] = message.offset(); ctx.collect(next); } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java index 9a2ba4c..2d74e38 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java @@ -63,14 +63,14 @@ public class StatefulSequenceSource extends RichParallelSourceFunction<Long> { ((end - start + 1) / stepSize + 1) : ((end - start + 1) / stepSize); - Long currentCollected = collected.getState(); + Long currentCollected = collected.value(); while (isRunning && currentCollected < toCollect) { synchronized (checkpointLock) { ctx.collect(currentCollected * stepSize + congruence); - collected.updateState(currentCollected + 1); + collected.update(currentCollected + 1); } - currentCollected = collected.getState(); + currentCollected = collected.value(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java index 808b7c8..bfc160f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java @@ -69,7 +69,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten } @Override - public S getState() throws IOException{ + public S value() throws IOException{ if (currentInput == null) { throw new IllegalStateException("Need a valid input for accessing the state."); } else { @@ -87,7 +87,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten } @Override - public void updateState(S state) throws IOException { + public void update(S state) throws IOException { if (state == null) { throw new RuntimeException("Cannot set state to null."); } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java index 1699c27..a80d730 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java @@ -59,12 +59,12 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS } @Override - public S getState() throws IOException { + public S value() throws IOException { return state; } @Override - public void updateState(S state) throws IOException { + public void update(S state) throws IOException { if (state == null) { throw new RuntimeException("Cannot set state to null."); } @@ -72,8 +72,8 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS } public void setDefaultState(S defaultState) throws IOException { - if (getState() == null) { - updateState(defaultState); + if (value() == null) { + update(defaultState); } } @@ -92,12 +92,12 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState( - getState(), checkpointId, checkpointTimestamp))); + value(), checkpointId, checkpointTimestamp))); } public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception { - updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState())); + update(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState())); } public Map<Serializable, S> getPartitionedState() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index 774b431..8c7ffeb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -71,9 +71,9 @@ public class StatefulOperatorTest { processInputs(map, Arrays.asList(1, 2, 3, 4, 5)); assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out); - assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).getState()); + assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value()); assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState()); - assertEquals("12345", context.getOperatorState("concat", "", false).getState()); + assertEquals("12345", context.getOperatorState("concat", "", false).value()); assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter); byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1)); @@ -81,19 +81,19 @@ public class StatefulOperatorTest { StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState); StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext(); - assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).getState()); + assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value()); assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState()); - assertEquals("12345", restoredContext.getOperatorState("concat", "", false).getState()); + assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value()); assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter); out.clear(); processInputs(restoredMap, Arrays.asList(7, 8)); assertEquals(Arrays.asList("7", "8"), out); - assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).getState()); + assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value()); assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter") .getPartitionedState()); - assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).getState()); + assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value()); assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter); } @@ -176,12 +176,12 @@ public class StatefulOperatorTest { @Override public String map(Integer value) throws Exception { - counter.updateState(counter.getState() + 1); - groupCounter.updateState(groupCounter.getState() + 1); - concat.updateState(concat.getState() + value.toString()); + counter.update(counter.value() + 1); + groupCounter.update(groupCounter.value() + 1); + concat.update(concat.value() + value.toString()); checkpointedCounter++; try { - counter.updateState(null); + counter.update(null); fail(); } catch (RuntimeException e){ } @@ -235,7 +235,7 @@ public class StatefulOperatorTest { @Override public String map(Integer value) throws Exception { - groupCounter.updateState(groupCounter.getState() + 1); + groupCounter.update(groupCounter.value() + 1); return value.toString(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index e2430d6..a826eff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -198,7 +198,7 @@ public class StreamCheckpointingITCase { static final long[] counts = new long[PARALLELISM]; @Override public void close() throws IOException { - counts[getRuntimeContext().getIndexOfThisSubtask()] = index.getState(); + counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value(); } @@ -222,8 +222,8 @@ public class StreamCheckpointingITCase { public void run(SourceContext<String> ctx) throws Exception { final Object lockingObject = ctx.getCheckpointLock(); - while (isRunning && index.getState() < numElements) { - char first = (char) ((index.getState() % 40) + 40); + while (isRunning && index.value() < numElements) { + char first = (char) ((index.value() % 40) + 40); stringBuilder.setLength(0); stringBuilder.append(first); @@ -231,7 +231,7 @@ public class StreamCheckpointingITCase { String result = randomString(stringBuilder, rnd); synchronized (lockingObject) { - index.updateState(index.getState() + step); + index.update(index.value() + step); ctx.collect(result); } } @@ -261,7 +261,7 @@ public class StreamCheckpointingITCase { @Override public PrefixCount map(PrefixCount value) throws Exception { - count.updateState(count.getState() + 1); + count.update(count.value() + 1); return value; } @@ -272,7 +272,7 @@ public class StreamCheckpointingITCase { @Override public void close() throws IOException { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState(); + counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value(); } } @@ -370,7 +370,7 @@ public class StreamCheckpointingITCase { @Override public PrefixCount map(String value) throws IOException { - count.updateState(count.getState() + 1); + count.update(count.value() + 1); return new PrefixCount(value.substring(0, 1), value, 1L); } @@ -381,7 +381,7 @@ public class StreamCheckpointingITCase { @Override public void close() throws IOException { - counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState(); + counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java index d8c925d..decc861 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java @@ -133,7 +133,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE); boolean checkForProceedFile = true; - while (isRunning && collected.getState() < toCollect) { + while (isRunning && collected.value() < toCollect) { // check if the proceed file exists (then we go full speed) // if not, we always recheck and sleep if (checkForProceedFile) { @@ -146,8 +146,8 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur } synchronized (checkpointLock) { - sourceCtx.collect(collected.getState() * stepSize + congruence); - collected.updateState(collected.getState() + 1); + sourceCtx.collect(collected.value() * stepSize + congruence); + collected.update(collected.value() + 1); } } }
