Repository: flink
Updated Branches:
  refs/heads/master 64aa7c899 -> de6a3d33e


[FLINK-4196] [runtime] Remove the 'recoveryTimestamp' from checkpoint restores.

The 'recoveryTimestamp' was an unsafe wall clock timestamp attached by the 
master
upon recovery. This this timestamp cannot be relied upon in distributed setups,
it is removed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de6a3d33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de6a3d33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de6a3d33

Branch: refs/heads/master
Commit: de6a3d33ecfa689fd0da1ef661bbf6edb68e9d0b
Parents: 2477161
Author: Stephan Ewen <[email protected]>
Authored: Wed Jul 13 17:31:35 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jul 14 21:11:48 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 13 ++++------
 .../operator/AbstractCEPPatternOperator.java    |  4 ++--
 .../AbstractKeyedCEPPatternOperator.java        |  4 ++--
 .../checkpoint/CheckpointCoordinator.java       |  4 +---
 .../checkpoint/SavepointCoordinator.java        |  4 +---
 .../deployment/TaskDeploymentDescriptor.java    | 13 ++--------
 .../flink/runtime/executiongraph/Execution.java |  7 +-----
 .../runtime/executiongraph/ExecutionVertex.java |  4 +---
 .../runtime/jobgraph/tasks/StatefulTask.java    |  3 +--
 .../runtime/state/AbstractStateBackend.java     |  5 ++--
 .../state/AsynchronousKvStateSnapshot.java      |  3 +--
 .../runtime/state/GenericFoldingState.java      |  9 ++++---
 .../flink/runtime/state/GenericListState.java   |  5 ++--
 .../runtime/state/GenericReducingState.java     |  5 ++--
 .../flink/runtime/state/KvStateSnapshot.java    |  4 +---
 .../apache/flink/runtime/state/StateUtils.java  | 12 ++++------
 .../filesystem/AbstractFsStateSnapshot.java     |  3 +--
 .../state/memory/AbstractMemStateSnapshot.java  |  2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  6 +----
 .../checkpoint/CheckpointStateRestoreTest.java  | 10 ++++----
 .../checkpoint/SavepointCoordinatorTest.java    |  3 +--
 .../runtime/state/StateBackendTestBase.java     | 25 ++++++++++----------
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  4 +---
 .../source/ContinuousFileReaderOperator.java    |  5 ++--
 .../api/operators/AbstractStreamOperator.java   |  4 ++--
 .../operators/AbstractUdfStreamOperator.java    |  4 ++--
 .../streaming/api/operators/StreamOperator.java |  4 +---
 .../operators/GenericWriteAheadSink.java        |  4 ++--
 ...ractAlignedProcessingTimeWindowOperator.java |  4 ++--
 .../operators/windowing/WindowOperator.java     |  4 ++--
 .../streaming/runtime/tasks/StreamTask.java     | 11 ++++-----
 .../operators/WriteAheadSinkTestBase.java       |  2 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 ++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 ++--
 .../tasks/StreamTaskAsyncCheckpointTest.java    |  4 ++--
 .../util/OneInputStreamOperatorTestHarness.java |  4 ++--
 36 files changed, 83 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 4c44249..4778aa0 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.api.common.state.StateBackend;
 
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.util.HDFSCopyFromLocal;
 import org.apache.flink.streaming.util.HDFSCopyToLocal;
 import org.apache.hadoop.fs.FileSystem;
@@ -162,7 +163,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
         * checkpoints and when disposing the db. Otherwise, the asynchronous 
snapshot might try
         * iterating over a disposed db.
         */
-       private Object dbCleanupLock;
+       private final SerializableObject dbCleanupLock = new 
SerializableObject();
 
        /**
         * Information about the k/v states as we create them. This is used to 
retrieve the
@@ -289,8 +290,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                        throw new RuntimeException("Error cleaning RocksDB data 
directory.", e);
                }
 
-               dbCleanupLock = new Object();
-
                List<ColumnFamilyDescriptor> columnFamilyDescriptors = new 
ArrayList<>(1);
                // RocksDB seems to need this...
                columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("default".getBytes()));
@@ -479,7 +478,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
-       public final void injectKeyValueStateSnapshots(HashMap<String, 
KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws 
Exception {
+       public final void injectKeyValueStateSnapshots(HashMap<String, 
KvStateSnapshot> keyValueStateSnapshots) throws Exception {
                if (keyValueStateSnapshots.size() == 0) {
                        return;
                }
@@ -670,8 +669,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                public final KvState<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
                                RocksDBStateBackend stateBackend,
                                TypeSerializer<Object> keySerializer,
-                               ClassLoader classLoader,
-                               long recoveryTimestamp) throws Exception {
+                               ClassLoader classLoader) throws Exception {
                        throw new RuntimeException("Should never happen.");
                }
 
@@ -807,8 +805,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                public final KvState<Object, Object, ValueState<Object>, 
ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
                                RocksDBStateBackend stateBackend,
                                TypeSerializer<Object> keySerializer,
-                               ClassLoader classLoader,
-                               long recoveryTimestamp) throws Exception {
+                               ClassLoader classLoader) throws Exception {
                        throw new RuntimeException("Should never happen.");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 753656f..8150eae 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -118,8 +118,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
 
        @Override
        @SuppressWarnings("unchecked")
-       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
-               super.restoreState(state, recoveryTimestamp);
+       public void restoreState(StreamTaskState state) throws Exception {
+               super.restoreState(state);
 
                StreamStateHandle stream = 
(StreamStateHandle)state.getOperatorState();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 83892ca..9ffe9b6 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -186,8 +186,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
        }
 
        @Override
-       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
-               super.restoreState(state, recoveryTimestamp);
+       public void restoreState(StreamTaskState state) throws Exception {
+               super.restoreState(state);
 
                @SuppressWarnings("unchecked")
                StateHandle<DataInputView> stateHandle = 
(StateHandle<DataInputView>) state.getOperatorState();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c599e5a..c6b2a77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -801,8 +801,6 @@ public class CheckpointCoordinator {
                                }
                        }
 
-                       long recoveryTimestamp = System.currentTimeMillis();
-
                        for (Map.Entry<JobVertexID, TaskState> 
taskGroupStateEntry: latest.getTaskStates().entrySet()) {
                                TaskState taskState = 
taskGroupStateEntry.getValue();
                                ExecutionJobVertex executionJobVertex = 
tasks.get(taskGroupStateEntry.getKey());
@@ -833,7 +831,7 @@ public class CheckpointCoordinator {
                                                Map<Integer, 
SerializedValue<StateHandle<?>>> kvStateForTaskMap = 
taskState.getUnwrappedKvStates(keyGroupPartitions.get(i));
 
                                                Execution 
currentExecutionAttempt = 
executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt();
-                                               
currentExecutionAttempt.setInitialState(state, kvStateForTaskMap, 
recoveryTimestamp);
+                                               
currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
                                        }
 
                                        if (allOrNothingState && counter > 0 && 
counter < executionJobVertex.getParallelism()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index 2c348ea..b96a02a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -188,8 +188,6 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
                                throw new 
IllegalStateException("CheckpointCoordinator is shut down");
                        }
 
-                       long recoveryTimestamp = System.currentTimeMillis();
-
                        LOG.info("Rolling back to savepoint '{}'.", 
savepointPath);
 
                        CompletedCheckpoint checkpoint = 
savepointStore.getState(savepointPath);
@@ -237,7 +235,7 @@ public class SavepointCoordinator extends 
CheckpointCoordinator {
                                                        .getTaskVertices()[i]
                                                        
.getCurrentExecutionAttempt();
 
-                                               
currentExecutionAttempt.setInitialState(state, kvStateForTaskMap, 
recoveryTimestamp);
+                                               
currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
                                        }
                                } else {
                                        String msg = String.format("Failed to 
rollback to savepoint %s. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index f595681..60fb45c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -93,8 +93,6 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        /** The execution configuration (see {@link ExecutionConfig}) related 
to the specific job. */
        private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
 
-       private long recoveryTimestamp;
-               
        /**
         * Constructs a task deployment descriptor.
         */
@@ -116,8 +114,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        List<BlobKey> requiredJarFiles,
                        List<URL> requiredClasspaths,
                        int targetSlotNumber,
-                       SerializedValue<StateHandle<?>> operatorState,
-                       long recoveryTimestamp) {
+                       SerializedValue<StateHandle<?>> operatorState) {
 
                checkArgument(indexInSubtaskGroup >= 0);
                checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -142,7 +139,6 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                this.requiredClasspaths = checkNotNull(requiredClasspaths);
                this.targetSlotNumber = targetSlotNumber;
                this.operatorState = operatorState;
-               this.recoveryTimestamp = recoveryTimestamp;
        }
 
        public TaskDeploymentDescriptor(
@@ -182,8 +178,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        requiredJarFiles,
                        requiredClasspaths,
                        targetSlotNumber,
-                       null,
-                       -1);
+                       null);
        }
 
        /**
@@ -324,8 +319,4 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        public SerializedValue<StateHandle<?>> getOperatorState() {
                return operatorState;
        }
-       
-       public long getRecoveryTimestamp() {
-               return recoveryTimestamp;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 691adaf..1b32100 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -139,8 +139,6 @@ public class Execution implements Serializable {
        private SerializedValue<StateHandle<?>> operatorState;
 
        private Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState;
-       
-       private long recoveryTimestamp;
 
        /** The execution context which is used to execute futures. */
        @SuppressWarnings("NonSerializableFieldInSerializableClass")
@@ -239,15 +237,13 @@ public class Execution implements Serializable {
 
        public void setInitialState(
                SerializedValue<StateHandle<?>> initialState,
-               Map<Integer, SerializedValue<StateHandle<?>>> initialKvState,
-               long recoveryTimestamp) {
+               Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
 
                if (state != ExecutionState.CREATED) {
                        throw new IllegalArgumentException("Can only assign 
operator state when execution attempt is in CREATED");
                }
                this.operatorState = initialState;
                this.operatorKvState = initialKvState;
-               this.recoveryTimestamp = recoveryTimestamp;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -376,7 +372,6 @@ public class Execution implements Serializable {
                                slot,
                                operatorState,
                                operatorKvState,
-                               recoveryTimestamp,
                                attemptNumber);
 
                        // register this execution at the execution graph, to 
receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a85f32a..e20f466 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -638,7 +638,6 @@ public class ExecutionVertex implements Serializable {
                        SimpleSlot targetSlot,
                        SerializedValue<StateHandle<?>> operatorState,
                        Map<Integer, SerializedValue<StateHandle<?>>> 
operatorKvState,
-                       long recoveryTimestamp,
                        int attemptNumber) {
 
                // Produced intermediate results
@@ -689,8 +688,7 @@ public class ExecutionVertex implements Serializable {
                        jarFiles,
                        classpaths,
                        targetSlot.getRoot().getSlotNumber(),
-                       operatorState,
-                       recoveryTimestamp);
+                       operatorState);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index aca1bc2..f8bba1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -31,9 +31,8 @@ public interface StatefulTask<T extends StateHandle<?>> {
         * a snapshot of the state from a previous execution.
         * 
         * @param stateHandle The handle to the state.
-        * @param recoveryTimestamp Global recovery timestamp.
         */
-       void setInitialState(T stateHandle, long recoveryTimestamp) throws 
Exception;
+       void setInitialState(T stateHandle) throws Exception;
 
        /**
         * This method is either called directly and asynchronously by the 
checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 95ca13f..6ab4999 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -341,7 +341,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
         * @param keyValueStateSnapshots The Map of snapshots
         */
        @SuppressWarnings("unchecked,rawtypes")
-       public void injectKeyValueStateSnapshots(HashMap<String, 
KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws 
Exception {
+       public void injectKeyValueStateSnapshots(HashMap<String, 
KvStateSnapshot> keyValueStateSnapshots) throws Exception {
                if (keyValueStateSnapshots != null) {
                        if (keyValueStatesByName == null) {
                                keyValueStatesByName = new HashMap<>();
@@ -350,8 +350,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        for (Map.Entry<String, KvStateSnapshot> state : 
keyValueStateSnapshots.entrySet()) {
                                KvState kvState = 
state.getValue().restoreState(this,
                                        keySerializer,
-                                       userCodeClassLoader,
-                                       recoveryTimestamp);
+                                       userCodeClassLoader);
                                keyValueStatesByName.put(state.getKey(), 
kvState);
                        }
                        keyValueStates = 
keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
index 30a9c5a..877034d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
@@ -45,8 +45,7 @@ public abstract class AsynchronousKvStateSnapshot<K, N, S 
extends State, SD exte
        public final KvState<K, N, S, SD, Backend> restoreState(
                Backend stateBackend,
                TypeSerializer<K> keySerializer,
-               ClassLoader classLoader,
-               long recoveryTimestamp) throws Exception {
+               ClassLoader classLoader) throws Exception {
                throw new RuntimeException("This should never be called and 
probably points to a bug.");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
index ef1d796..762cc3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
@@ -113,11 +113,10 @@ public class GenericFoldingState<K, N, T, ACC, Backend 
extends AbstractStateBack
                @Override
                @SuppressWarnings("unchecked")
                public KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> restoreState(
-                       Backend stateBackend,
-                       TypeSerializer<K> keySerializer,
-                       ClassLoader classLoader,
-                       long recoveryTimestamp) throws Exception {
-                       return new GenericFoldingState((ValueState<ACC>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, 
recoveryTimestamp), foldFunction);
+                               Backend stateBackend,
+                               TypeSerializer<K> keySerializer,
+                               ClassLoader classLoader) throws Exception {
+                       return new GenericFoldingState((ValueState<ACC>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), 
foldFunction);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
index fbb0170..9393082 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -120,9 +120,8 @@ public class GenericListState<K, N, T, Backend extends 
AbstractStateBackend, W e
                public KvState<K, N, ListState<T>, ListStateDescriptor<T>, 
Backend> restoreState(
                        Backend stateBackend,
                        TypeSerializer<K> keySerializer,
-                       ClassLoader classLoader,
-                       long recoveryTimestamp) throws Exception {
-                       return new GenericListState((ValueState<T>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, 
recoveryTimestamp));
+                       ClassLoader classLoader) throws Exception {
+                       return new GenericListState((ValueState<T>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
index 102e25e..7407dfa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
@@ -118,9 +118,8 @@ public class GenericReducingState<K, N, T, Backend extends 
AbstractStateBackend,
                public KvState<K, N, ReducingState<T>, 
ReducingStateDescriptor<T>, Backend> restoreState(
                        Backend stateBackend,
                        TypeSerializer<K> keySerializer,
-                       ClassLoader classLoader,
-                       long recoveryTimestamp) throws Exception {
-                       return new GenericReducingState((ValueState<T>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, 
recoveryTimestamp), reduceFunction);
+                       ClassLoader classLoader) throws Exception {
+                       return new GenericReducingState((ValueState<T>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), 
reduceFunction);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
index 245427e..847d53e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
@@ -48,7 +48,6 @@ public interface KvStateSnapshot<K, N, S extends State, SD 
extends StateDescript
         *                     from this snapshot.
         * @param keySerializer The serializer for the keys.
         * @param classLoader The class loader for user-defined types.
-        * @param recoveryTimestamp The timestamp of the checkpoint we are 
recovering from.
         *
         * @return An instance of the key/value state loaded from this snapshot.
         * 
@@ -57,8 +56,7 @@ public interface KvStateSnapshot<K, N, S extends State, SD 
extends StateDescript
        KvState<K, N, S, SD, Backend> restoreState(
                Backend stateBackend,
                TypeSerializer<K> keySerializer,
-               ClassLoader classLoader,
-               long recoveryTimestamp) throws Exception;
+               ClassLoader classLoader) throws Exception;
 
        /**
         * Discards the state snapshot, removing any resources occupied by it.

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index 96e0eb5..b130c70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -38,24 +38,22 @@ public class StateUtils {
         *            The state carrier operator.
         * @param state
         *            The state handle.
-        * @param recoveryTimestamp
-        *            Global recovery timestamp
         * @param <T>
         *            Type bound for the
         */
-       public static <T extends StateHandle<?>> void 
setOperatorState(StatefulTask<?> op,
-                       StateHandle<?> state, long recoveryTimestamp) throws 
Exception {
+       public static <T extends StateHandle<?>> void 
setOperatorState(StatefulTask<?> op, StateHandle<?> state)
+                       throws Exception {
+
                @SuppressWarnings("unchecked")
                StatefulTask<T> typedOp = (StatefulTask<T>) op;
                @SuppressWarnings("unchecked")
                T typedHandle = (T) state;
 
-               typedOp.setInitialState(typedHandle, recoveryTimestamp);
+               typedOp.setInitialState(typedHandle);
        }
 
        // 
------------------------------------------------------------------------
 
        /** Do not instantiate */
-       private StateUtils() {
-       }
+       private StateUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 432a9e6..cd02870 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -83,8 +83,7 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S 
extends State, SD exte
        public KvState<K, N, S, SD, FsStateBackend> restoreState(
                FsStateBackend stateBackend,
                final TypeSerializer<K> keySerializer,
-               ClassLoader classLoader,
-               long recoveryTimestamp) throws Exception {
+               ClassLoader classLoader) throws Exception {
 
                // validity checks
                if (!this.keySerializer.equals(keySerializer)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
index 5d4f0d8..86d4c7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -82,7 +82,7 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S 
extends State, SD ext
        public KvState<K, N, S, SD, MemoryStateBackend> restoreState(
                MemoryStateBackend stateBackend,
                final TypeSerializer<K> keySerializer,
-               ClassLoader classLoader, long recoveryTimestamp) throws 
Exception {
+               ClassLoader classLoader) throws Exception {
 
                // validity checks
                if (!this.keySerializer.equals(keySerializer)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 58eb90c..25e4b43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -226,8 +226,6 @@ public class Task implements Runnable {
         * initialization, to be memory friendly */
        private volatile SerializedValue<StateHandle<?>> operatorState;
 
-       private volatile long recoveryTs;
-
        /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
        private long taskCancellationInterval;
 
@@ -259,7 +257,6 @@ public class Task implements Runnable {
                this.requiredClasspaths = 
checkNotNull(tdd.getRequiredClasspaths());
                this.nameOfInvokableClass = 
checkNotNull(tdd.getInvokableClassName());
                this.operatorState = tdd.getOperatorState();
-               this.recoveryTs = tdd.getRecoveryTimestamp();
                this.serializedExecutionConfig = 
checkNotNull(tdd.getSerializedExecutionConfig());
 
                this.taskCancellationInterval = jobConfiguration.getLong(
@@ -538,14 +535,13 @@ public class Task implements Runnable {
 
                        // get our private reference onto the stack (be safe 
against concurrent changes)
                        SerializedValue<StateHandle<?>> operatorState = 
this.operatorState;
-                       long recoveryTs = this.recoveryTs;
 
                        if (operatorState != null) {
                                if (invokable instanceof StatefulTask) {
                                        try {
                                                StateHandle<?> state = 
operatorState.deserializeValue(userCodeClassLoader);
                                                StatefulTask<?> op = 
(StatefulTask<?>) invokable;
-                                               StateUtils.setOperatorState(op, 
state, recoveryTs);
+                                               StateUtils.setOperatorState(op, 
state);
                                        }
                                        catch (Exception e) {
                                                throw new 
RuntimeException("Failed to deserialize state handle and setup initial operator 
state.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 68cd145..2b1b7e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -113,11 +113,11 @@ public class CheckpointStateRestoreTest {
                        coord.restoreLatestCheckpointedState(map, true, false);
 
                        // verify that each stateful vertex got the state
-                       verify(statefulExec1, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
-                       verify(statefulExec2, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
-                       verify(statefulExec3, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
-                       verify(statelessExec1, 
times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), 
Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), 
Mockito.anyLong());
-                       verify(statelessExec2, 
times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), 
Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), 
Mockito.anyLong());
+                       verify(statefulExec1, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any());
+                       verify(statefulExec2, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any());
+                       verify(statefulExec3, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any());
+                       verify(statelessExec1, 
times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), 
Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+                       verify(statelessExec2, 
times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), 
Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
index 405fd07..384ed42 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
@@ -58,7 +58,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
@@ -202,7 +201,7 @@ public class SavepointCoordinatorTest extends TestLogger {
                // Verify all executions have been reset
                for (ExecutionVertex vertex : ackVertices) {
                        verify(vertex.getCurrentExecutionAttempt(), 
times(1)).setInitialState(
-                                       any(SerializedValue.class), 
any(Map.class), anyLong());
+                                       any(SerializedValue.class), 
any(Map.class));
                }
 
                // Verify all promises removed

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index cc36f4a..12cf112 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -53,6 +53,7 @@ import static org.junit.Assert.*;
 /**
  * Generic tests for the partitioned state part of {@link 
AbstractStateBackend}.
  */
+@SuppressWarnings("serial")
 public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
        protected B backend;
@@ -129,7 +130,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                backend.dispose();
                backend.initializeForJob(new DummyEnvironment("test", 1, 0), 
"test_op", IntSerializer.INSTANCE);
 
-               backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+               backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
                for (String key: snapshot1.keySet()) {
                        snapshot1.get(key).discardState();
@@ -145,7 +146,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                backend.dispose();
                backend.initializeForJob(new DummyEnvironment("test", 1, 0), 
"test_op", IntSerializer.INSTANCE);
 
-               backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100);
+               backend.injectKeyValueStateSnapshots((HashMap) snapshot2);
 
                for (String key: snapshot2.keySet()) {
                        snapshot2.get(key).discardState();
@@ -221,7 +222,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                backend.dispose();
                backend.initializeForJob(new DummyEnvironment("test", 1, 0), 
"test_op", IntSerializer.INSTANCE);
 
-               backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+               backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
                for (String key: snapshot1.keySet()) {
                        snapshot1.get(key).discardState();
@@ -288,7 +289,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1);
 
                        for (String key: snapshot1.keySet()) {
                                snapshot1.get(key).discardState();
@@ -305,7 +306,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the second snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot2, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot2);
 
                        for (String key: snapshot2.keySet()) {
                                snapshot2.get(key).discardState();
@@ -383,7 +384,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1);
 
                        for (String key: snapshot1.keySet()) {
                                snapshot1.get(key).discardState();
@@ -400,7 +401,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the second snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot2, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot2);
 
                        for (String key: snapshot2.keySet()) {
                                snapshot2.get(key).discardState();
@@ -484,7 +485,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1);
 
                        for (String key: snapshot1.keySet()) {
                                snapshot1.get(key).discardState();
@@ -501,7 +502,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the second snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot2, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot2);
 
                        for (String key: snapshot2.keySet()) {
                                snapshot2.get(key).discardState();
@@ -553,7 +554,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1);
 
                        for (String key: snapshot1.keySet()) {
                                snapshot1.get(key).discardState();
@@ -612,7 +613,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1);
 
                        for (String key: snapshot1.keySet()) {
                                snapshot1.get(key).discardState();
@@ -674,7 +675,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1, 100);
+                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot1);
 
                        for (String key: snapshot1.keySet()) {
                                snapshot1.get(key).discardState();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7b55987..4b90b88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -201,9 +201,7 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void setInitialState(StateHandle<Serializable> 
stateHandle, long ts) throws Exception {
-
-               }
+               public void setInitialState(StateHandle<Serializable> 
stateHandle) throws Exception {}
 
                @Override
                public boolean triggerCheckpoint(long checkpointId, long 
timestamp) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index b13e7a8..fda5efd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -406,8 +406,8 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
        }
 
        @Override
-       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
-               super.restoreState(state, recoveryTimestamp);
+       public void restoreState(StreamTaskState state) throws Exception {
+               super.restoreState(state);
 
                StreamStateHandle stream = (StreamStateHandle) 
state.getOperatorState();
 
@@ -427,6 +427,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                }
 
                // read the state of the format
+               @SuppressWarnings("unchecked")
                S formatState = (S) ois.readObject();
 
                // set the whole reader state for the open() to find.

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7755347..0269a34 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -195,11 +195,11 @@ public abstract class AbstractStreamOperator<OUT>
        
        @Override
        @SuppressWarnings("rawtypes,unchecked")
-       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
+       public void restoreState(StreamTaskState state) throws Exception {
                // restore the key/value state. the actual restore happens 
lazily, when the function requests
                // the state again, because the restore method needs 
information provided by the user function
                if (stateBackend != null) {
-                       
stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates(), 
recoveryTimestamp);
+                       
stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 86b07d6..1ddd934 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -150,8 +150,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function> extends
        }
 
        @Override
-       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
-               super.restoreState(state, recoveryTimestamp);
+       public void restoreState(StreamTaskState state) throws Exception {
+               super.restoreState(state);
                
                StateHandle<Serializable> stateHandle =  
state.getFunctionState();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 4572ef1..3e38165 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -115,13 +115,11 @@ public interface StreamOperator<OUT> extends Serializable 
{
         *
         * @param state The state of operator that was snapshotted as part of 
checkpoint
         *              from which the execution is restored.
-        * 
-        * @param recoveryTimestamp Global recovery timestamp
         *
         * @throws Exception Exceptions during state restore should be 
forwarded, so that the system can
         *                   properly react to failed state restore and fail 
the execution attempt.
         */
-       void restoreState(StreamTaskState state, long recoveryTimestamp) throws 
Exception;
+       void restoreState(StreamTaskState state) throws Exception;
 
        /**
         * Called when the checkpoint with the given ID is completed and 
acknowledged on the JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 5545717..b268c7a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -109,8 +109,8 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        }
 
        @Override
-       public void restoreState(StreamTaskState state, long recoveryTimestamp) 
throws Exception {
-               super.restoreState(state, recoveryTimestamp);
+       public void restoreState(StreamTaskState state) throws Exception {
+               super.restoreState(state);
                this.state = (ExactlyOnceState) state.getFunctionState();
                out = null;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 32c4e67..fdc8117 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -261,8 +261,8 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
        }
 
        @Override
-       public void restoreState(StreamTaskState taskState, long 
recoveryTimestamp) throws Exception {
-               super.restoreState(taskState, recoveryTimestamp);
+       public void restoreState(StreamTaskState taskState) throws Exception {
+               super.restoreState(taskState);
 
                @SuppressWarnings("unchecked")
                StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b6ca564..bb05d2b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -846,8 +846,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        @Override
-       public void restoreState(StreamTaskState taskState, long 
recoveryTimestamp) throws Exception {
-               super.restoreState(taskState, recoveryTimestamp);
+       public void restoreState(StreamTaskState taskState) throws Exception {
+               super.restoreState(taskState);
 
                final ClassLoader userClassloader = getUserCodeClassloader();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a5de312..6ad94b4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -156,8 +156,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        /** Flag to mark this task as canceled */
        private volatile boolean canceled;
 
-       private long recoveryTimestamp;
-
        private long lastCheckpointSize = 0;
 
        // 
------------------------------------------------------------------------
@@ -498,13 +496,12 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        // 
------------------------------------------------------------------------
        //  Checkpoint and Restore
        // 
------------------------------------------------------------------------
-       
+
        @Override
-       public void setInitialState(StreamTaskStateList initialState, long 
recoveryTimestamp) {
+       public void setInitialState(StreamTaskStateList initialState) {
                lazyRestoreState = initialState;
-               this.recoveryTimestamp = recoveryTimestamp;
        }
-       
+
        private void restoreState() throws Exception {
                if (lazyRestoreState != null) {
                        LOG.info("Restoring checkpointed state to task {}", 
getName());
@@ -522,7 +519,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                        
                                        if (state != null && operator != null) {
                                                LOG.debug("Task {} in chain 
({}) has checkpointed state", i, getName());
-                                               operator.restoreState(state, 
recoveryTimestamp);
+                                               operator.restoreState(state);
                                        }
                                        else if (operator != null) {
                                                LOG.debug("Task {} in chain 
({}) does not have checkpointed state", i, getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index 3f3a387..1d706d1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -201,7 +201,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                task.getOperator().close();
                task.getOperator().open();
 
-               task.getOperator().restoreState(states.get(states.size() - 1), 
0);
+               task.getOperator().restoreState(states.get(states.size() - 1));
 
                for (int x = 0; x < 20; x++) {
                        testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index eb087c6..d2f8e05 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -512,7 +512,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        windowSize, windowSize);
 
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), out2);
-                       op.restoreState(state, 1);
+                       op.restoreState(state);
                        op.open();
 
                        // inject some more elements
@@ -609,7 +609,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                        windowSize, windowSlide);
 
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), out2);
-                       op.restoreState(state, 1);
+                       op.restoreState(state);
                        op.open();
                        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index af46513..585eaa7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -609,7 +609,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        windowSize, windowSize);
 
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), out2);
-                       op.restoreState(state, 1);
+                       op.restoreState(state);
                        op.open();
 
                        // inject the remaining elements
@@ -717,7 +717,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        windowSize, windowSlide);
 
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), out2);
-                       op.restoreState(state, 1);
+                       op.restoreState(state);
                        op.open();
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index 8e1cadf..b74903a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -172,8 +172,8 @@ public class StreamTaskAsyncCheckpointTest {
                }
 
                @Override
-               public void restoreState(StreamTaskState taskState, long 
recoveryTimestamp) throws Exception {
-                       super.restoreState(taskState, recoveryTimestamp);
+               public void restoreState(StreamTaskState taskState) throws 
Exception {
+                       super.restoreState(taskState);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 66bdb57..7d0fc57 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -195,10 +195,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState,
 long)} ()}
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState)}
 ()}
         */
        public void restore(StreamTaskState snapshot, long recoveryTimestamp) 
throws Exception {
-               operator.restoreState(snapshot, recoveryTimestamp);
+               operator.restoreState(snapshot);
        }
 
        /**

Reply via email to