Repository: flink
Updated Branches:
  refs/heads/master 557540a51 -> ac6e5c9a0


[FLINK-6662] [errMsg] Improve error message if recovery from 
RetrievableStateHandles fails

When recovering state from a ZooKeeperStateHandleStore it can happen that the 
deserialization
fails, because one tries to recover state from an old Flink version which is 
not compatible.
In this case we should output a better error message such that the user can 
easily spot the
problem.

This closes #3972.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac6e5c9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac6e5c9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac6e5c9a

Branch: refs/heads/master
Commit: ac6e5c9a03177ad18899e27c8877efb0c9211842
Parents: 557540a
Author: Till Rohrmann <[email protected]>
Authored: Tue May 23 15:42:38 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue May 23 20:24:28 2017 +0200

----------------------------------------------------------------------
 .../store/ZooKeeperMesosWorkerStore.java        | 25 ++++++++++++++++----
 .../ZooKeeperCompletedCheckpointStore.java      | 11 +++++++--
 .../ZooKeeperSubmittedJobGraphStore.java        | 12 ++++++++--
 .../runtime/state/RetrievableStateHandle.java   |  3 ++-
 .../state/RetrievableStreamStateHandle.java     |  2 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  3 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  3 ++-
 .../ZooKeeperStateHandleStoreTest.java          |  2 +-
 8 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 663ce56..4544b8e 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -25,12 +25,14 @@ import 
org.apache.flink.runtime.zookeeper.ZooKeeperSharedCount;
 import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
+import org.apache.flink.util.FlinkException;
 import org.apache.mesos.Protos;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -171,19 +173,32 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
 
                        List<Tuple2<RetrievableStateHandle<Worker>, String>> 
handles = workersInZooKeeper.getAllAndLock();
 
-                       if(handles.size() != 0) {
+                       if (handles.isEmpty()) {
+                               return Collections.emptyList();
+                       }
+                       else {
                                List<MesosWorkerStore.Worker> workers = new 
ArrayList<>(handles.size());
+
                                for (Tuple2<RetrievableStateHandle<Worker>, 
String> handle : handles) {
-                                       Worker worker = 
handle.f0.retrieveState();
+                                       final Worker worker;
+
+                                       try {
+                                               worker = 
handle.f0.retrieveState();
+                                       } catch (ClassNotFoundException cnfe) {
+                                               throw new FlinkException("Could 
not retrieve Mesos worker from state handle under " +
+                                                       handle.f1 + ". This 
indicates that you are trying to recover from state written by an " +
+                                                       "older Flink version 
which is not compatible. Try cleaning the state handle store.", cnfe);
+                                       } catch (IOException ioe) {
+                                               throw new FlinkException("Could 
not retrieve Mesos worker from state handle under " +
+                                                       handle.f1 + ". This 
indicates that the retrieved state handle is broken. Try cleaning " +
+                                                       "the state handle 
store.", ioe);
+                                       }
 
                                        workers.add(worker);
                                }
 
                                return workers;
                        }
-                       else {
-                               return Collections.emptyList();
-                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 469c1b1..c4cb6bc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
@@ -376,8 +377,14 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                try {
                        return stateHandlePath.f0.retrieveState();
-               } catch (Exception e) {
-                       throw new FlinkException("Could not retrieve checkpoint 
" + checkpointId + ". The state handle seems to be broken.", e);
+               } catch (ClassNotFoundException cnfe) {
+                       throw new FlinkException("Could not retrieve checkpoint 
" + checkpointId + " from state handle under " +
+                               stateHandlePath.f1 + ". This indicates that you 
are trying to recover from state written by an " +
+                               "older Flink version which is not compatible. 
Try cleaning the state handle store.", cnfe);
+               } catch (IOException ioe) {
+                       throw new FlinkException("Could not retrieve checkpoint 
" + checkpointId + " from state handle under " +
+                               stateHandlePath.f1 + ". This indicates that the 
retrieved state handle is broken. Try cleaning the " +
+                               "state handle store.", ioe);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index fa972ed..f31c970 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -27,10 +27,12 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.FlinkException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -182,8 +184,14 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
 
                                try {
                                        jobGraph = 
jobGraphRetrievableStateHandle.retrieveState();
-                               } catch (Exception e) {
-                                       throw new Exception("Failed to retrieve 
the submitted job graph from state handle.", e);
+                               } catch (ClassNotFoundException cnfe) {
+                                       throw new FlinkException("Could not 
retrieve submitted JobGraph from state handle under " + path +
+                                               ". This indicates that you are 
trying to recover from state written by an " +
+                                               "older Flink version which is 
not compatible. Try cleaning the state handle store.", cnfe);
+                               } catch (IOException ioe) {
+                                       throw new FlinkException("Could not 
retrieve submitted JobGraph from state handle under " + path +
+                                               ". This indicates that the 
retrieved state handle is broken. Try cleaning the state handle " +
+                                               "store.", ioe);
                                }
 
                                addedJobGraphs.add(jobGraph.getJobId());

http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
index d547624..30ac8f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 /**
@@ -28,5 +29,5 @@ public interface RetrievableStateHandle<T extends 
Serializable> extends StateObj
        /**
         * Retrieves the object that was previously written to state.
         */
-       T retrieveState() throws Exception;
+       T retrieveState() throws IOException, ClassNotFoundException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index 653e227..6ed60fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -53,7 +53,7 @@ public class RetrievableStreamStateHandle<T extends 
Serializable> implements
        }
 
        @Override
-       public T retrieveState() throws Exception {
+       public T retrieveState() throws IOException, ClassNotFoundException {
                try (FSDataInputStream in = openInputStream()) {
                        return InstantiationUtil.deserializeObject(in, 
Thread.currentThread().getContextClassLoader());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 81ee4f9..77423c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -297,8 +297,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                        stateMap.put(key, state);
                }
 
+               @SuppressWarnings("unchecked")
                @Override
-               public T retrieveState() throws Exception {
+               public T retrieveState() {
                        return (T) stateMap.get(key);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 23cc8c8..b5854dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -39,6 +39,7 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -93,7 +94,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                expectedCheckpointIds.add(2L);
 
                final RetrievableStateHandle<CompletedCheckpoint> 
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-               
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
Exception("Test exception"));
+               
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
 
                final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
                
when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ac6e5c9a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index 0c215cd..fd39b25 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -784,7 +784,7 @@ public class ZooKeeperStateHandleStoreTest extends 
TestLogger {
                }
 
                @Override
-               public Long retrieveState() throws Exception {
+               public Long retrieveState() {
                        return state;
                }
 

Reply via email to