Repository: flink Updated Branches: refs/heads/master 64aa7c899 -> de6a3d33e
[FLINK-4196] [runtime] Remove the 'recoveryTimestamp' from checkpoint restores. The 'recoveryTimestamp' was an unsafe wall clock timestamp attached by the master upon recovery. This this timestamp cannot be relied upon in distributed setups, it is removed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de6a3d33 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de6a3d33 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de6a3d33 Branch: refs/heads/master Commit: de6a3d33ecfa689fd0da1ef661bbf6edb68e9d0b Parents: 2477161 Author: Stephan Ewen <[email protected]> Authored: Wed Jul 13 17:31:35 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jul 14 21:11:48 2016 +0200 ---------------------------------------------------------------------- .../streaming/state/RocksDBStateBackend.java | 13 ++++------ .../operator/AbstractCEPPatternOperator.java | 4 ++-- .../AbstractKeyedCEPPatternOperator.java | 4 ++-- .../checkpoint/CheckpointCoordinator.java | 4 +--- .../checkpoint/SavepointCoordinator.java | 4 +--- .../deployment/TaskDeploymentDescriptor.java | 13 ++-------- .../flink/runtime/executiongraph/Execution.java | 7 +----- .../runtime/executiongraph/ExecutionVertex.java | 4 +--- .../runtime/jobgraph/tasks/StatefulTask.java | 3 +-- .../runtime/state/AbstractStateBackend.java | 5 ++-- .../state/AsynchronousKvStateSnapshot.java | 3 +-- .../runtime/state/GenericFoldingState.java | 9 ++++--- .../flink/runtime/state/GenericListState.java | 5 ++-- .../runtime/state/GenericReducingState.java | 5 ++-- .../flink/runtime/state/KvStateSnapshot.java | 4 +--- .../apache/flink/runtime/state/StateUtils.java | 12 ++++------ .../filesystem/AbstractFsStateSnapshot.java | 3 +-- .../state/memory/AbstractMemStateSnapshot.java | 2 +- .../apache/flink/runtime/taskmanager/Task.java | 6 +---- .../checkpoint/CheckpointStateRestoreTest.java | 10 ++++---- .../checkpoint/SavepointCoordinatorTest.java | 3 +-- .../runtime/state/StateBackendTestBase.java | 25 ++++++++++---------- .../runtime/taskmanager/TaskAsyncCallTest.java | 4 +--- .../source/ContinuousFileReaderOperator.java | 5 ++-- .../api/operators/AbstractStreamOperator.java | 4 ++-- .../operators/AbstractUdfStreamOperator.java | 4 ++-- .../streaming/api/operators/StreamOperator.java | 4 +--- .../operators/GenericWriteAheadSink.java | 4 ++-- ...ractAlignedProcessingTimeWindowOperator.java | 4 ++-- .../operators/windowing/WindowOperator.java | 4 ++-- .../streaming/runtime/tasks/StreamTask.java | 11 ++++----- .../operators/WriteAheadSinkTestBase.java | 2 +- ...AlignedProcessingTimeWindowOperatorTest.java | 4 ++-- ...AlignedProcessingTimeWindowOperatorTest.java | 4 ++-- .../tasks/StreamTaskAsyncCheckpointTest.java | 4 ++-- .../util/OneInputStreamOperatorTestHarness.java | 4 ++-- 36 files changed, 83 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 4c44249..4778aa0 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -58,6 +58,7 @@ import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.api.common.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.util.HDFSCopyFromLocal; import org.apache.flink.streaming.util.HDFSCopyToLocal; import org.apache.hadoop.fs.FileSystem; @@ -162,7 +163,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try * iterating over a disposed db. */ - private Object dbCleanupLock; + private final SerializableObject dbCleanupLock = new SerializableObject(); /** * Information about the k/v states as we create them. This is used to retrieve the @@ -289,8 +290,6 @@ public class RocksDBStateBackend extends AbstractStateBackend { throw new RuntimeException("Error cleaning RocksDB data directory.", e); } - dbCleanupLock = new Object(); - List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); // RocksDB seems to need this... columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes())); @@ -479,7 +478,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { } @Override - public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws Exception { + public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception { if (keyValueStateSnapshots.size() == 0) { return; } @@ -670,8 +669,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState( RocksDBStateBackend stateBackend, TypeSerializer<Object> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { + ClassLoader classLoader) throws Exception { throw new RuntimeException("Should never happen."); } @@ -807,8 +805,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState( RocksDBStateBackend stateBackend, TypeSerializer<Object> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { + ClassLoader classLoader) throws Exception { throw new RuntimeException("Should never happen."); } http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java index 753656f..8150eae 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java @@ -118,8 +118,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas @Override @SuppressWarnings("unchecked") - public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { - super.restoreState(state, recoveryTimestamp); + public void restoreState(StreamTaskState state) throws Exception { + super.restoreState(state); StreamStateHandle stream = (StreamStateHandle)state.getOperatorState(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 83892ca..9ffe9b6 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -186,8 +186,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst } @Override - public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { - super.restoreState(state, recoveryTimestamp); + public void restoreState(StreamTaskState state) throws Exception { + super.restoreState(state); @SuppressWarnings("unchecked") StateHandle<DataInputView> stateHandle = (StateHandle<DataInputView>) state.getOperatorState(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c599e5a..c6b2a77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -801,8 +801,6 @@ public class CheckpointCoordinator { } } - long recoveryTimestamp = System.currentTimeMillis(); - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry: latest.getTaskStates().entrySet()) { TaskState taskState = taskGroupStateEntry.getValue(); ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey()); @@ -833,7 +831,7 @@ public class CheckpointCoordinator { Map<Integer, SerializedValue<StateHandle<?>>> kvStateForTaskMap = taskState.getUnwrappedKvStates(keyGroupPartitions.get(i)); Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt(); - currentExecutionAttempt.setInitialState(state, kvStateForTaskMap, recoveryTimestamp); + currentExecutionAttempt.setInitialState(state, kvStateForTaskMap); } if (allOrNothingState && counter > 0 && counter < executionJobVertex.getParallelism()) { http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java index 2c348ea..b96a02a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java @@ -188,8 +188,6 @@ public class SavepointCoordinator extends CheckpointCoordinator { throw new IllegalStateException("CheckpointCoordinator is shut down"); } - long recoveryTimestamp = System.currentTimeMillis(); - LOG.info("Rolling back to savepoint '{}'.", savepointPath); CompletedCheckpoint checkpoint = savepointStore.getState(savepointPath); @@ -237,7 +235,7 @@ public class SavepointCoordinator extends CheckpointCoordinator { .getTaskVertices()[i] .getCurrentExecutionAttempt(); - currentExecutionAttempt.setInitialState(state, kvStateForTaskMap, recoveryTimestamp); + currentExecutionAttempt.setInitialState(state, kvStateForTaskMap); } } else { String msg = String.format("Failed to rollback to savepoint %s. " + http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index f595681..60fb45c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -93,8 +93,6 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */ private final SerializedValue<ExecutionConfig> serializedExecutionConfig; - private long recoveryTimestamp; - /** * Constructs a task deployment descriptor. */ @@ -116,8 +114,7 @@ public final class TaskDeploymentDescriptor implements Serializable { List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths, int targetSlotNumber, - SerializedValue<StateHandle<?>> operatorState, - long recoveryTimestamp) { + SerializedValue<StateHandle<?>> operatorState) { checkArgument(indexInSubtaskGroup >= 0); checkArgument(numberOfSubtasks > indexInSubtaskGroup); @@ -142,7 +139,6 @@ public final class TaskDeploymentDescriptor implements Serializable { this.requiredClasspaths = checkNotNull(requiredClasspaths); this.targetSlotNumber = targetSlotNumber; this.operatorState = operatorState; - this.recoveryTimestamp = recoveryTimestamp; } public TaskDeploymentDescriptor( @@ -182,8 +178,7 @@ public final class TaskDeploymentDescriptor implements Serializable { requiredJarFiles, requiredClasspaths, targetSlotNumber, - null, - -1); + null); } /** @@ -324,8 +319,4 @@ public final class TaskDeploymentDescriptor implements Serializable { public SerializedValue<StateHandle<?>> getOperatorState() { return operatorState; } - - public long getRecoveryTimestamp() { - return recoveryTimestamp; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 691adaf..1b32100 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -139,8 +139,6 @@ public class Execution implements Serializable { private SerializedValue<StateHandle<?>> operatorState; private Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState; - - private long recoveryTimestamp; /** The execution context which is used to execute futures. */ @SuppressWarnings("NonSerializableFieldInSerializableClass") @@ -239,15 +237,13 @@ public class Execution implements Serializable { public void setInitialState( SerializedValue<StateHandle<?>> initialState, - Map<Integer, SerializedValue<StateHandle<?>>> initialKvState, - long recoveryTimestamp) { + Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) { if (state != ExecutionState.CREATED) { throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED"); } this.operatorState = initialState; this.operatorKvState = initialKvState; - this.recoveryTimestamp = recoveryTimestamp; } // -------------------------------------------------------------------------------------------- @@ -376,7 +372,6 @@ public class Execution implements Serializable { slot, operatorState, operatorKvState, - recoveryTimestamp, attemptNumber); // register this execution at the execution graph, to receive call backs http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a85f32a..e20f466 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -638,7 +638,6 @@ public class ExecutionVertex implements Serializable { SimpleSlot targetSlot, SerializedValue<StateHandle<?>> operatorState, Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState, - long recoveryTimestamp, int attemptNumber) { // Produced intermediate results @@ -689,8 +688,7 @@ public class ExecutionVertex implements Serializable { jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(), - operatorState, - recoveryTimestamp); + operatorState); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index aca1bc2..f8bba1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -31,9 +31,8 @@ public interface StatefulTask<T extends StateHandle<?>> { * a snapshot of the state from a previous execution. * * @param stateHandle The handle to the state. - * @param recoveryTimestamp Global recovery timestamp. */ - void setInitialState(T stateHandle, long recoveryTimestamp) throws Exception; + void setInitialState(T stateHandle) throws Exception; /** * This method is either called directly and asynchronously by the checkpoint http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 95ca13f..6ab4999 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -341,7 +341,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * @param keyValueStateSnapshots The Map of snapshots */ @SuppressWarnings("unchecked,rawtypes") - public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws Exception { + public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception { if (keyValueStateSnapshots != null) { if (keyValueStatesByName == null) { keyValueStatesByName = new HashMap<>(); @@ -350,8 +350,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) { KvState kvState = state.getValue().restoreState(this, keySerializer, - userCodeClassLoader, - recoveryTimestamp); + userCodeClassLoader); keyValueStatesByName.put(state.getKey(), kvState); } keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java index 30a9c5a..877034d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java @@ -45,8 +45,7 @@ public abstract class AsynchronousKvStateSnapshot<K, N, S extends State, SD exte public final KvState<K, N, S, SD, Backend> restoreState( Backend stateBackend, TypeSerializer<K> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { + ClassLoader classLoader) throws Exception { throw new RuntimeException("This should never be called and probably points to a bug."); } http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java index ef1d796..762cc3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java @@ -113,11 +113,10 @@ public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBack @Override @SuppressWarnings("unchecked") public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> restoreState( - Backend stateBackend, - TypeSerializer<K> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { - return new GenericFoldingState((ValueState<ACC>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), foldFunction); + Backend stateBackend, + TypeSerializer<K> keySerializer, + ClassLoader classLoader) throws Exception { + return new GenericFoldingState((ValueState<ACC>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), foldFunction); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java index fbb0170..9393082 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java @@ -120,9 +120,8 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e public KvState<K, N, ListState<T>, ListStateDescriptor<T>, Backend> restoreState( Backend stateBackend, TypeSerializer<K> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { - return new GenericListState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp)); + ClassLoader classLoader) throws Exception { + return new GenericListState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java index 102e25e..7407dfa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java @@ -118,9 +118,8 @@ public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend, public KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> restoreState( Backend stateBackend, TypeSerializer<K> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { - return new GenericReducingState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), reduceFunction); + ClassLoader classLoader) throws Exception { + return new GenericReducingState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), reduceFunction); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java index 245427e..847d53e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java @@ -48,7 +48,6 @@ public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescript * from this snapshot. * @param keySerializer The serializer for the keys. * @param classLoader The class loader for user-defined types. - * @param recoveryTimestamp The timestamp of the checkpoint we are recovering from. * * @return An instance of the key/value state loaded from this snapshot. * @@ -57,8 +56,7 @@ public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescript KvState<K, N, S, SD, Backend> restoreState( Backend stateBackend, TypeSerializer<K> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception; + ClassLoader classLoader) throws Exception; /** * Discards the state snapshot, removing any resources occupied by it. http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java index 96e0eb5..b130c70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java @@ -38,24 +38,22 @@ public class StateUtils { * The state carrier operator. * @param state * The state handle. - * @param recoveryTimestamp - * Global recovery timestamp * @param <T> * Type bound for the */ - public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op, - StateHandle<?> state, long recoveryTimestamp) throws Exception { + public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op, StateHandle<?> state) + throws Exception { + @SuppressWarnings("unchecked") StatefulTask<T> typedOp = (StatefulTask<T>) op; @SuppressWarnings("unchecked") T typedHandle = (T) state; - typedOp.setInitialState(typedHandle, recoveryTimestamp); + typedOp.setInitialState(typedHandle); } // ------------------------------------------------------------------------ /** Do not instantiate */ - private StateUtils() { - } + private StateUtils() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java index 432a9e6..cd02870 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java @@ -83,8 +83,7 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte public KvState<K, N, S, SD, FsStateBackend> restoreState( FsStateBackend stateBackend, final TypeSerializer<K> keySerializer, - ClassLoader classLoader, - long recoveryTimestamp) throws Exception { + ClassLoader classLoader) throws Exception { // validity checks if (!this.keySerializer.equals(keySerializer)) { http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java index 5d4f0d8..86d4c7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java @@ -82,7 +82,7 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD ext public KvState<K, N, S, SD, MemoryStateBackend> restoreState( MemoryStateBackend stateBackend, final TypeSerializer<K> keySerializer, - ClassLoader classLoader, long recoveryTimestamp) throws Exception { + ClassLoader classLoader) throws Exception { // validity checks if (!this.keySerializer.equals(keySerializer)) { http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 58eb90c..25e4b43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -226,8 +226,6 @@ public class Task implements Runnable { * initialization, to be memory friendly */ private volatile SerializedValue<StateHandle<?>> operatorState; - private volatile long recoveryTs; - /** Initialized from the Flink configuration. May also be set at the ExecutionConfig */ private long taskCancellationInterval; @@ -259,7 +257,6 @@ public class Task implements Runnable { this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths()); this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName()); this.operatorState = tdd.getOperatorState(); - this.recoveryTs = tdd.getRecoveryTimestamp(); this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig()); this.taskCancellationInterval = jobConfiguration.getLong( @@ -538,14 +535,13 @@ public class Task implements Runnable { // get our private reference onto the stack (be safe against concurrent changes) SerializedValue<StateHandle<?>> operatorState = this.operatorState; - long recoveryTs = this.recoveryTs; if (operatorState != null) { if (invokable instanceof StatefulTask) { try { StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader); StatefulTask<?> op = (StatefulTask<?>) invokable; - StateUtils.setOperatorState(op, state, recoveryTs); + StateUtils.setOperatorState(op, state); } catch (Exception e) { throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 68cd145..2b1b7e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -113,11 +113,11 @@ public class CheckpointStateRestoreTest { coord.restoreLatestCheckpointedState(map, true, false); // verify that each stateful vertex got the state - verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong()); - verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong()); - verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong()); - verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong()); - verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong()); + verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); + verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); + verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); + verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); + verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any()); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java index 405fd07..384ed42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java @@ -58,7 +58,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; @@ -202,7 +201,7 @@ public class SavepointCoordinatorTest extends TestLogger { // Verify all executions have been reset for (ExecutionVertex vertex : ackVertices) { verify(vertex.getCurrentExecutionAttempt(), times(1)).setInitialState( - any(SerializedValue.class), any(Map.class), anyLong()); + any(SerializedValue.class), any(Map.class)); } // Verify all promises removed http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index cc36f4a..12cf112 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -53,6 +53,7 @@ import static org.junit.Assert.*; /** * Generic tests for the partitioned state part of {@link AbstractStateBackend}. */ +@SuppressWarnings("serial") public abstract class StateBackendTestBase<B extends AbstractStateBackend> { protected B backend; @@ -129,7 +130,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.dispose(); backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); @@ -145,7 +146,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.dispose(); backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2); for (String key: snapshot2.keySet()) { snapshot2.get(key).discardState(); @@ -221,7 +222,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { backend.dispose(); backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); @@ -288,7 +289,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); @@ -305,7 +306,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the second snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2); for (String key: snapshot2.keySet()) { snapshot2.get(key).discardState(); @@ -383,7 +384,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); @@ -400,7 +401,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the second snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2); for (String key: snapshot2.keySet()) { snapshot2.get(key).discardState(); @@ -484,7 +485,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); @@ -501,7 +502,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the second snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot2); for (String key: snapshot2.keySet()) { snapshot2.get(key).discardState(); @@ -553,7 +554,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); @@ -612,7 +613,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); @@ -674,7 +675,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); - backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100); + backend.injectKeyValueStateSnapshots((HashMap) snapshot1); for (String key: snapshot1.keySet()) { snapshot1.get(key).discardState(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 7b55987..4b90b88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -201,9 +201,7 @@ public class TaskAsyncCallTest { } @Override - public void setInitialState(StateHandle<Serializable> stateHandle, long ts) throws Exception { - - } + public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {} @Override public boolean triggerCheckpoint(long checkpointId, long timestamp) { http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index b13e7a8..fda5efd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -406,8 +406,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } @Override - public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { - super.restoreState(state, recoveryTimestamp); + public void restoreState(StreamTaskState state) throws Exception { + super.restoreState(state); StreamStateHandle stream = (StreamStateHandle) state.getOperatorState(); @@ -427,6 +427,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } // read the state of the format + @SuppressWarnings("unchecked") S formatState = (S) ois.readObject(); // set the whole reader state for the open() to find. http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 7755347..0269a34 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -195,11 +195,11 @@ public abstract class AbstractStreamOperator<OUT> @Override @SuppressWarnings("rawtypes,unchecked") - public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { + public void restoreState(StreamTaskState state) throws Exception { // restore the key/value state. the actual restore happens lazily, when the function requests // the state again, because the restore method needs information provided by the user function if (stateBackend != null) { - stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates(), recoveryTimestamp); + stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 86b07d6..1ddd934 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -150,8 +150,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends } @Override - public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { - super.restoreState(state, recoveryTimestamp); + public void restoreState(StreamTaskState state) throws Exception { + super.restoreState(state); StateHandle<Serializable> stateHandle = state.getFunctionState(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 4572ef1..3e38165 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -115,13 +115,11 @@ public interface StreamOperator<OUT> extends Serializable { * * @param state The state of operator that was snapshotted as part of checkpoint * from which the execution is restored. - * - * @param recoveryTimestamp Global recovery timestamp * * @throws Exception Exceptions during state restore should be forwarded, so that the system can * properly react to failed state restore and fail the execution attempt. */ - void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception; + void restoreState(StreamTaskState state) throws Exception; /** * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager. http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 5545717..b268c7a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -109,8 +109,8 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I } @Override - public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { - super.restoreState(state, recoveryTimestamp); + public void restoreState(StreamTaskState state) throws Exception { + super.restoreState(state); this.state = (ExactlyOnceState) state.getFunctionState(); out = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index 32c4e67..fdc8117 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -261,8 +261,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, } @Override - public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { - super.restoreState(taskState, recoveryTimestamp); + public void restoreState(StreamTaskState taskState) throws Exception { + super.restoreState(taskState); @SuppressWarnings("unchecked") StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index b6ca564..bb05d2b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -846,8 +846,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override - public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { - super.restoreState(taskState, recoveryTimestamp); + public void restoreState(StreamTaskState taskState) throws Exception { + super.restoreState(taskState); final ClassLoader userClassloader = getUserCodeClassloader(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index a5de312..6ad94b4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -156,8 +156,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> /** Flag to mark this task as canceled */ private volatile boolean canceled; - private long recoveryTimestamp; - private long lastCheckpointSize = 0; // ------------------------------------------------------------------------ @@ -498,13 +496,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // ------------------------------------------------------------------------ // Checkpoint and Restore // ------------------------------------------------------------------------ - + @Override - public void setInitialState(StreamTaskStateList initialState, long recoveryTimestamp) { + public void setInitialState(StreamTaskStateList initialState) { lazyRestoreState = initialState; - this.recoveryTimestamp = recoveryTimestamp; } - + private void restoreState() throws Exception { if (lazyRestoreState != null) { LOG.info("Restoring checkpointed state to task {}", getName()); @@ -522,7 +519,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> if (state != null && operator != null) { LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName()); - operator.restoreState(state, recoveryTimestamp); + operator.restoreState(state); } else if (operator != null) { LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName()); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java index 3f3a387..1d706d1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java @@ -201,7 +201,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink task.getOperator().close(); task.getOperator().open(); - task.getOperator().restoreState(states.get(states.size() - 1), 0); + task.getOperator().restoreState(states.get(states.size() - 1)); for (int x = 0; x < 20; x++) { testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2))); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index eb087c6..d2f8e05 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -512,7 +512,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSize); op.setup(mockTask, new StreamConfig(new Configuration()), out2); - op.restoreState(state, 1); + op.restoreState(state); op.open(); // inject some more elements @@ -609,7 +609,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSlide); op.setup(mockTask, new StreamConfig(new Configuration()), out2); - op.restoreState(state, 1); + op.restoreState(state); op.open(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index af46513..585eaa7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -609,7 +609,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSize); op.setup(mockTask, new StreamConfig(new Configuration()), out2); - op.restoreState(state, 1); + op.restoreState(state); op.open(); // inject the remaining elements @@ -717,7 +717,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSlide); op.setup(mockTask, new StreamConfig(new Configuration()), out2); - op.restoreState(state, 1); + op.restoreState(state); op.open(); http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java index 8e1cadf..b74903a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java @@ -172,8 +172,8 @@ public class StreamTaskAsyncCheckpointTest { } @Override - public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception { - super.restoreState(taskState, recoveryTimestamp); + public void restoreState(StreamTaskState taskState) throws Exception { + super.restoreState(taskState); } } http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 66bdb57..7d0fc57 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -195,10 +195,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState, long)} ()} + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState)} ()} */ public void restore(StreamTaskState snapshot, long recoveryTimestamp) throws Exception { - operator.restoreState(snapshot, recoveryTimestamp); + operator.restoreState(snapshot); } /**
