Repository: flink
Updated Branches:
  refs/heads/master 0e21941e3 -> 0d2c49005


[FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() 
and update(..)

Closes #890


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2c4900
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2c4900
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2c4900

Branch: refs/heads/master
Commit: 0d2c49005449d6e05bbf53446edac928bb7ecbb6
Parents: 0e21941
Author: Gyula Fora <[email protected]>
Authored: Tue Jul 7 11:07:05 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Tue Jul 7 12:42:29 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  6 +++---
 .../api/common/functions/RuntimeContext.java    |  8 +++----
 .../flink/api/common/state/OperatorState.java   | 22 ++++++++++----------
 .../api/persistent/PersistentKafkaSource.java   | 16 +++++++-------
 .../source/StatefulSequenceSource.java          |  6 +++---
 .../state/PartitionedStreamOperatorState.java   |  4 ++--
 .../api/state/StreamOperatorState.java          | 12 +++++------
 .../api/state/StatefulOperatorTest.java         | 22 ++++++++++----------
 .../StreamCheckpointingITCase.java              | 16 +++++++-------
 .../ProcessFailureStreamingRecoveryITCase.java  |  6 +++---
 10 files changed, 59 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index c612b69..7d8ab6d 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1220,7 +1220,7 @@ Flink supports the checkpointing and persistence of user 
defined operator states
 
 Flink supports two types of operator states: partitioned and non-partitioned 
states.
 
-In case of non-partitioned operator state, an operator state is maintained for 
each parallel instance of a given operator. When `OperatorState.getState()` is 
called, a separate state is returned in each parallel instance. In practice 
this means if we keep a counter for the received inputs in a mapper, 
`getState()` will return number of inputs processed by each parallel mapper.
+In case of non-partitioned operator state, an operator state is maintained for 
each parallel instance of a given operator. When `OperatorState.value()` is 
called, a separate state is returned in each parallel instance. In practice 
this means if we keep a counter for the received inputs in a mapper, `value()` 
will return number of inputs processed by each parallel mapper.
 
 In case of of partitioned operator state a separate state is maintained for 
each received key. This can be used for instance to count received inputs by 
different keys, or store and update summary statistics of different sub-streams.
 
@@ -1244,7 +1244,7 @@ public class CounterSum implements 
RichReduceFunction<Long> {
 
     @Override
     public Long reduce(Long value1, Long value2) throws Exception {
-        counter.updateState(counter.getState() + 1);
+        counter.update(counter.value() + 1);
         return value1 + value2;
     }
 
@@ -1275,7 +1275,7 @@ public static class CounterSource implements 
RichParallelSourceFunction<Long> {
             // output and state update are atomic
             synchronized (lock){
                 ctx.collect(offset);
-                offset.updateState(offset.getState() + 1);
+                offset.update(offset.value() + 1);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 4c8e924..eb84d1c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -184,9 +184,9 @@ public interface RuntimeContext {
         *            can be used by the same operator.
         * @param defaultState
         *            Default value for the operator state. This will be 
returned
-        *            the first time {@link OperatorState#getState()} (for every
+        *            the first time {@link OperatorState#value()} (for every
         *            state partition) is called before
-        *            {@link OperatorState#updateState(Object)}.
+        *            {@link OperatorState#update(Object)}.
         * @param partitioned
         *            Sets whether partitioning should be applied for the given
         *            state. If true a partitioner key must be used.
@@ -216,9 +216,9 @@ public interface RuntimeContext {
         *            can be used by the same operator.
         * @param defaultState
         *            Default value for the operator state. This will be 
returned
-        *            the first time {@link OperatorState#getState()} (for every
+        *            the first time {@link OperatorState#value()} (for every
         *            state partition) is called before
-        *            {@link OperatorState#updateState(Object)}.
+        *            {@link OperatorState#update(Object)}.
         * @param partitioned
         *            Sets whether partitioning should be applied for the given
         *            state. If true a partitioner key must be used.

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 4198a50..955b35b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.common.functions.MapFunction;
  * partitioned (when state partitioning is defined in the program) or
  * non-partitioned user states.
  * 
- * State can be accessed and manipulated using the {@link #getState()} and
- * {@link #updateState(T)} methods. These calls are only safe in the
+ * State can be accessed and manipulated using the {@link #value()} and
+ * {@link #update(T)} methods. These calls are only safe in the
  * transformation call the operator represents, for instance inside
  * {@link MapFunction#map()} and can lead tp unexpected behavior in the
  * {@link #open(org.apache.flink.configuration.Configuration)} or
@@ -40,28 +40,28 @@ import org.apache.flink.api.common.functions.MapFunction;
 public interface OperatorState<T> {
 
        /**
-        * Gets the current state for the operator. When the state is not
-        * partitioned the returned state is the same for all inputs in a given
-        * operator instance. If state partitioning is applied, the state 
returned
+        * Returns the current value for the state. When the state is not
+        * partitioned the returned value is the same for all inputs in a given
+        * operator instance. If state partitioning is applied, the value 
returned
         * depends on the current operator input, as the operator maintains an
         * independent state for each partition.
         * 
-        * @return The operator state corresponding to the current input.
+        * @return The operator state value corresponding to the current input.
         * 
         * @throws IOException Thrown if the system cannot access the state.
         */
-       T getState() throws IOException;
+       T value() throws IOException;
 
        /**
-        * Updates the operator state accessible by {@link #getState()} to the 
given
-        * value. The next time {@link #getState()} is called (for the same 
state
+        * Updates the operator state accessible by {@link #value()} to the 
given
+        * value. The next time {@link #value()} is called (for the same state
         * partition) the returned state will represent the updated value.
         * 
         * @param state
-        *            The new state.
+        *            The new value for the state.
         *            
         * @throws IOException Thrown if the system cannot access the state.
         */
-       void updateState(T state) throws IOException;
+       void update(T value) throws IOException;
        
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index befbef6..6758f2c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -148,17 +148,17 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                this.lastOffsets = 
getRuntimeContext().getOperatorState("offset", new long[numPartitions], false);
                this.commitedOffsets = new long[numPartitions];
                // check if there are offsets to restore
-               if (!Arrays.equals(lastOffsets.getState(), new 
long[numPartitions])) {
-                       if (lastOffsets.getState().length != numPartitions) {
-                               throw new IllegalStateException("There are 
"+lastOffsets.getState().length+" offsets to restore for topic "+topicName+" 
but " +
+               if (!Arrays.equals(lastOffsets.value(), new 
long[numPartitions])) {
+                       if (lastOffsets.value().length != numPartitions) {
+                               throw new IllegalStateException("There are 
"+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " 
+
                                                "there are only 
"+numPartitions+" in the topic");
                        }
 
-                       LOG.info("Setting restored offsets {} in ZooKeeper", 
Arrays.toString(lastOffsets.getState()));
-                       setOffsetsInZooKeeper(lastOffsets.getState());
+                       LOG.info("Setting restored offsets {} in ZooKeeper", 
Arrays.toString(lastOffsets.value()));
+                       setOffsetsInZooKeeper(lastOffsets.value());
                } else {
                        // initialize empty offsets
-                       Arrays.fill(this.lastOffsets.getState(), -1);
+                       Arrays.fill(this.lastOffsets.value(), -1);
                }
                Arrays.fill(this.commitedOffsets, 0); // just to make it clear
                
@@ -175,7 +175,7 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
                
                while (running && iteratorToRead.hasNext()) {
                        MessageAndMetadata<byte[], byte[]> message = 
iteratorToRead.next();
-                       if(lastOffsets.getState()[message.partition()] >= 
message.offset()) {
+                       if(lastOffsets.value()[message.partition()] >= 
message.offset()) {
                                LOG.info("Skipping message with offset {} from 
partition {}", message.offset(), message.partition());
                                continue;
                        }
@@ -188,7 +188,7 @@ public class PersistentKafkaSource<OUT> extends 
RichParallelSourceFunction<OUT>
 
                        // make the state update and the element emission atomic
                        synchronized (checkpointLock) {
-                               lastOffsets.getState()[message.partition()] = 
message.offset();
+                               lastOffsets.value()[message.partition()] = 
message.offset();
                                ctx.collect(next);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index 9a2ba4c..2d74e38 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -63,14 +63,14 @@ public class StatefulSequenceSource extends 
RichParallelSourceFunction<Long> {
                                        ((end - start + 1) / stepSize + 1) :
                                        ((end - start + 1) / stepSize);
                                        
-               Long currentCollected = collected.getState();
+               Long currentCollected = collected.value();
 
                while (isRunning && currentCollected < toCollect) {
                        synchronized (checkpointLock) {
                                ctx.collect(currentCollected * stepSize + 
congruence);
-                               collected.updateState(currentCollected + 1);
+                               collected.update(currentCollected + 1);
                        }
-                       currentCollected = collected.getState();
+                       currentCollected = collected.value();
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index 808b7c8..bfc160f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -69,7 +69,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends 
Serializable> exten
        }
 
        @Override
-       public S getState() throws IOException{
+       public S value() throws IOException{
                if (currentInput == null) {
                        throw new IllegalStateException("Need a valid input for 
accessing the state.");
                } else {
@@ -87,7 +87,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends 
Serializable> exten
        }
 
        @Override
-       public void updateState(S state) throws IOException {
+       public void update(S state) throws IOException {
                if (state == null) {
                        throw new RuntimeException("Cannot set state to null.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index 1699c27..a80d730 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -59,12 +59,12 @@ public class StreamOperatorState<S, C extends Serializable> 
implements OperatorS
        }
 
        @Override
-       public S getState() throws IOException {
+       public S value() throws IOException {
                return state;
        }
 
        @Override
-       public void updateState(S state) throws IOException {
+       public void update(S state) throws IOException {
                if (state == null) {
                        throw new RuntimeException("Cannot set state to null.");
                }
@@ -72,8 +72,8 @@ public class StreamOperatorState<S, C extends Serializable> 
implements OperatorS
        }
        
        public void setDefaultState(S defaultState) throws IOException {
-               if (getState() == null) {
-                       updateState(defaultState);
+               if (value() == null) {
+                       update(defaultState);
                }
        }
 
@@ -92,12 +92,12 @@ public class StreamOperatorState<S, C extends Serializable> 
implements OperatorS
        public Map<Serializable, StateHandle<C>> snapshotState(long 
checkpointId,
                        long checkpointTimestamp) throws Exception {
                return ImmutableMap.of(DEFAULTKEY, 
provider.createStateHandle(checkpointer.snapshotState(
-                               getState(), checkpointId, 
checkpointTimestamp)));
+                               value(), checkpointId, checkpointTimestamp)));
 
        }
 
        public void restoreState(Map<Serializable, StateHandle<C>> snapshots) 
throws Exception {
-               
updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
+               
update(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
        }
 
        public Map<Serializable, S> getPartitionedState() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 774b431..8c7ffeb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -71,9 +71,9 @@ public class StatefulOperatorTest {
                processInputs(map, Arrays.asList(1, 2, 3, 4, 5));
 
                assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
-               assertEquals((Integer) 5, context.getOperatorState("counter", 
0, false).getState());
+               assertEquals((Integer) 5, context.getOperatorState("counter", 
0, false).value());
                assertEquals(ImmutableMap.of(0, 2, 1, 3), 
context.getOperatorStates().get("groupCounter").getPartitionedState());
-               assertEquals("12345", context.getOperatorState("concat", "", 
false).getState());
+               assertEquals("12345", context.getOperatorState("concat", "", 
false).value());
                assertEquals((Integer) 5, ((StatefulMapper) 
map.getUserFunction()).checkpointedCounter);
 
                byte[] serializedState = 
InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
@@ -81,19 +81,19 @@ public class StatefulOperatorTest {
                StreamMap<Integer, String> restoredMap = 
createOperatorWithContext(out, new ModKey(2), serializedState);
                StreamingRuntimeContext restoredContext = 
restoredMap.getRuntimeContext();
 
-               assertEquals((Integer) 5, 
restoredContext.getOperatorState("counter", 0, false).getState());
+               assertEquals((Integer) 5, 
restoredContext.getOperatorState("counter", 0, false).value());
                assertEquals(ImmutableMap.of(0, 2, 1, 3), 
context.getOperatorStates().get("groupCounter").getPartitionedState());
-               assertEquals("12345", 
restoredContext.getOperatorState("concat", "", false).getState());
+               assertEquals("12345", 
restoredContext.getOperatorState("concat", "", false).value());
                assertEquals((Integer) 5, ((StatefulMapper) 
restoredMap.getUserFunction()).checkpointedCounter);
                out.clear();
 
                processInputs(restoredMap, Arrays.asList(7, 8));
 
                assertEquals(Arrays.asList("7", "8"), out);
-               assertEquals((Integer) 7, 
restoredContext.getOperatorState("counter", 0, false).getState());
+               assertEquals((Integer) 7, 
restoredContext.getOperatorState("counter", 0, false).value());
                assertEquals(ImmutableMap.of(0, 3, 1, 4), 
restoredContext.getOperatorStates().get("groupCounter")
                                .getPartitionedState());
-               assertEquals("1234578", 
restoredContext.getOperatorState("concat", "", false).getState());
+               assertEquals("1234578", 
restoredContext.getOperatorState("concat", "", false).value());
                assertEquals((Integer) 7, ((StatefulMapper) 
restoredMap.getUserFunction()).checkpointedCounter);
 
        }
@@ -176,12 +176,12 @@ public class StatefulOperatorTest {
 
                @Override
                public String map(Integer value) throws Exception {
-                       counter.updateState(counter.getState() + 1);
-                       groupCounter.updateState(groupCounter.getState() + 1);
-                       concat.updateState(concat.getState() + 
value.toString());
+                       counter.update(counter.value() + 1);
+                       groupCounter.update(groupCounter.value() + 1);
+                       concat.update(concat.value() + value.toString());
                        checkpointedCounter++;
                        try {
-                               counter.updateState(null);
+                               counter.update(null);
                                fail();
                        } catch (RuntimeException e){
                        }
@@ -235,7 +235,7 @@ public class StatefulOperatorTest {
                
                @Override
                public String map(Integer value) throws Exception {
-                       groupCounter.updateState(groupCounter.getState() + 1);
+                       groupCounter.update(groupCounter.value() + 1);
                        
                        return value.toString();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index e2430d6..a826eff 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -198,7 +198,7 @@ public class StreamCheckpointingITCase {
                static final long[] counts = new long[PARALLELISM];
                @Override
                public void close() throws IOException {
-                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
index.getState();
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
index.value();
                }
 
 
@@ -222,8 +222,8 @@ public class StreamCheckpointingITCase {
                public void run(SourceContext<String> ctx) throws Exception {
                        final Object lockingObject = ctx.getCheckpointLock();
 
-                       while (isRunning && index.getState() < numElements) {
-                               char first = (char) ((index.getState() % 40) + 
40);
+                       while (isRunning && index.value() < numElements) {
+                               char first = (char) ((index.value() % 40) + 40);
 
                                stringBuilder.setLength(0);
                                stringBuilder.append(first);
@@ -231,7 +231,7 @@ public class StreamCheckpointingITCase {
                                String result = randomString(stringBuilder, 
rnd);
 
                                synchronized (lockingObject) {
-                                       index.updateState(index.getState() + 
step);
+                                       index.update(index.value() + step);
                                        ctx.collect(result);
                                }
                        }
@@ -261,7 +261,7 @@ public class StreamCheckpointingITCase {
 
                @Override
                public PrefixCount map(PrefixCount value) throws Exception {
-                       count.updateState(count.getState() + 1);
+                       count.update(count.value() + 1);
                        return value;
                }
 
@@ -272,7 +272,7 @@ public class StreamCheckpointingITCase {
 
                @Override
                public void close() throws IOException {
-                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count.getState();
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count.value();
                }
                
        }
@@ -370,7 +370,7 @@ public class StreamCheckpointingITCase {
                
                @Override
                public PrefixCount map(String value) throws IOException {
-                       count.updateState(count.getState() + 1);
+                       count.update(count.value() + 1);
                        return new PrefixCount(value.substring(0, 1), value, 
1L);
                }
                
@@ -381,7 +381,7 @@ public class StreamCheckpointingITCase {
 
                @Override
                public void close() throws IOException {
-                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count.getState();
+                       counts[getRuntimeContext().getIndexOfThisSubtask()] = 
count.value();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index d8c925d..decc861 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -133,7 +133,7 @@ public class ProcessFailureStreamingRecoveryITCase extends 
AbstractProcessFailur
                        final File proceedFile = new File(coordinateDir, 
PROCEED_MARKER_FILE);
                        boolean checkForProceedFile = true;
 
-                       while (isRunning && collected.getState() < toCollect) {
+                       while (isRunning && collected.value() < toCollect) {
                                // check if the proceed file exists (then we go 
full speed)
                                // if not, we always recheck and sleep
                                if (checkForProceedFile) {
@@ -146,8 +146,8 @@ public class ProcessFailureStreamingRecoveryITCase extends 
AbstractProcessFailur
                                }
 
                                synchronized (checkpointLock) {
-                                       sourceCtx.collect(collected.getState() 
* stepSize + congruence);
-                                       
collected.updateState(collected.getState() + 1);
+                                       sourceCtx.collect(collected.value() * 
stepSize + congruence);
+                                       collected.update(collected.value() + 1);
                                }
                        }
                }

Reply via email to