Repository: flink Updated Branches: refs/heads/release-1.3 ce685dbda -> d552b3447
[FLINK-6662] [errMsg] Improve error message if recovery from RetrievableStateHandles fails When recovering state from a ZooKeeperStateHandleStore it can happen that the deserialization fails, because one tries to recover state from an old Flink version which is not compatible. In this case we should output a better error message such that the user can easily spot the problem. This closes #3972. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d552b344 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d552b344 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d552b344 Branch: refs/heads/release-1.3 Commit: d552b34470748de803a999c2c4c1557c49b30045 Parents: ce685db Author: Till Rohrmann <[email protected]> Authored: Tue May 23 15:42:38 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue May 23 20:25:18 2017 +0200 ---------------------------------------------------------------------- .../store/ZooKeeperMesosWorkerStore.java | 25 ++++++++++++++++---- .../ZooKeeperCompletedCheckpointStore.java | 11 +++++++-- .../ZooKeeperSubmittedJobGraphStore.java | 12 ++++++++-- .../runtime/state/RetrievableStateHandle.java | 3 ++- .../state/RetrievableStreamStateHandle.java | 2 +- ...ZooKeeperCompletedCheckpointStoreITCase.java | 3 ++- .../ZooKeeperCompletedCheckpointStoreTest.java | 3 ++- .../ZooKeeperStateHandleStoreTest.java | 2 +- 8 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java index 663ce56..4544b8e 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java @@ -25,12 +25,14 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount; import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue; +import org.apache.flink.util.FlinkException; import org.apache.mesos.Protos; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.ConcurrentModificationException; @@ -171,19 +173,32 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { List<Tuple2<RetrievableStateHandle<Worker>, String>> handles = workersInZooKeeper.getAllAndLock(); - if(handles.size() != 0) { + if (handles.isEmpty()) { + return Collections.emptyList(); + } + else { List<MesosWorkerStore.Worker> workers = new ArrayList<>(handles.size()); + for (Tuple2<RetrievableStateHandle<Worker>, String> handle : handles) { - Worker worker = handle.f0.retrieveState(); + final Worker worker; + + try { + worker = handle.f0.retrieveState(); + } catch (ClassNotFoundException cnfe) { + throw new FlinkException("Could not retrieve Mesos worker from state handle under " + + handle.f1 + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); + } catch (IOException ioe) { + throw new FlinkException("Could not retrieve Mesos worker from state handle under " + + handle.f1 + ". This indicates that the retrieved state handle is broken. Try cleaning " + + "the state handle store.", ioe); + } workers.add(worker); } return workers; } - else { - return Collections.emptyList(); - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 469c1b1..c4cb6bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.ConcurrentModificationException; @@ -376,8 +377,14 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto try { return stateHandlePath.f0.retrieveState(); - } catch (Exception e) { - throw new FlinkException("Could not retrieve checkpoint " + checkpointId + ". The state handle seems to be broken.", e); + } catch (ClassNotFoundException cnfe) { + throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " + + stateHandlePath.f1 + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); + } catch (IOException ioe) { + throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " + + stateHandlePath.f1 + ". This indicates that the retrieved state handle is broken. Try cleaning the " + + "state handle store.", ioe); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index fa972ed..f31c970 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -27,10 +27,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.util.FlinkException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -182,8 +184,14 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { try { jobGraph = jobGraphRetrievableStateHandle.retrieveState(); - } catch (Exception e) { - throw new Exception("Failed to retrieve the submitted job graph from state handle.", e); + } catch (ClassNotFoundException cnfe) { + throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + path + + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); + } catch (IOException ioe) { + throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + path + + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle " + + "store.", ioe); } addedJobGraphs.add(jobGraph.getJobId()); http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java index d547624..30ac8f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; +import java.io.IOException; import java.io.Serializable; /** @@ -28,5 +29,5 @@ public interface RetrievableStateHandle<T extends Serializable> extends StateObj /** * Retrieves the object that was previously written to state. */ - T retrieveState() throws Exception; + T retrieveState() throws IOException, ClassNotFoundException; } http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java index 653e227..6ed60fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java @@ -53,7 +53,7 @@ public class RetrievableStreamStateHandle<T extends Serializable> implements } @Override - public T retrieveState() throws Exception { + public T retrieveState() throws IOException, ClassNotFoundException { try (FSDataInputStream in = openInputStream()) { return InstantiationUtil.deserializeObject(in, Thread.currentThread().getContextClassLoader()); } http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 81ee4f9..77423c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -297,8 +297,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint stateMap.put(key, state); } + @SuppressWarnings("unchecked") @Override - public T retrieveState() throws Exception { + public T retrieveState() { return (T) stateMap.get(key); } http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 23cc8c8..b5854dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -39,6 +39,7 @@ import org.mockito.stubbing.Answer; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -93,7 +94,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { expectedCheckpointIds.add(2L); final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle = mock(RetrievableStateHandle.class); - when(failingRetrievableStateHandle.retrieveState()).thenThrow(new Exception("Test exception")); + when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception")); final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 = mock(RetrievableStateHandle.class); when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1); http://git-wip-us.apache.org/repos/asf/flink/blob/d552b344/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index 0c215cd..fd39b25 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -784,7 +784,7 @@ public class ZooKeeperStateHandleStoreTest extends TestLogger { } @Override - public Long retrieveState() throws Exception { + public Long retrieveState() { return state; }
