[FLINK-5559] [queryable state] Throw proper IOException 
deserializeKeyAndNamespace

This adds the hint that a deserialisation failure probably results from a
"mismatch in the key/namespace serializers used by the KvState instance and this
access" to all thrown exceptions.

This closes #3172.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a0e3d61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a0e3d61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a0e3d61

Branch: refs/heads/release-1.2
Commit: 7a0e3d610fb3c604e0dfdcc7d0df9ea576549e97
Parents: f592d4c
Author: Nico Kruber <[email protected]>
Authored: Wed Jan 18 18:31:20 2017 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Sun Jan 22 15:57:01 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 33 ++++++++--------
 .../query/netty/KvStateServerHandlerTest.java   |  4 +-
 .../message/KvStateRequestSerializerTest.java   | 40 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a0e3d61/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index eb37106..2f32861 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -363,8 +363,7 @@ public final class KvStateRequestSerializer {
         * @param <K>                       Key type
         * @param <N>                       Namespace
         * @return Tuple2 holding deserialized key and namespace
-        * @throws IOException           Serialization errors are forwarded
-        * @throws IllegalStateException If unexpected magic number between key 
and namespace
+        * @throws IOException              if the deserialization fails for 
any reason
         */
        public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
                        byte[] serializedKeyAndNamespace,
@@ -376,22 +375,24 @@ public final class KvStateRequestSerializer {
                                0,
                                serializedKeyAndNamespace.length);
 
-               K key = keySerializer.deserialize(dis);
-               byte magicNumber = dis.readByte();
-               if (magicNumber != 42) {
-                       throw new IllegalArgumentException("Unexpected magic 
number " + magicNumber +
-                                       ". This indicates a mismatch in the key 
serializers used by the " +
-                                       "KvState instance and this access.");
-               }
-               N namespace = namespaceSerializer.deserialize(dis);
+               try {
+                       K key = keySerializer.deserialize(dis);
+                       byte magicNumber = dis.readByte();
+                       if (magicNumber != 42) {
+                               throw new IOException("Unexpected magic number 
" + magicNumber + ".");
+                       }
+                       N namespace = namespaceSerializer.deserialize(dis);
 
-               if (dis.available() > 0) {
-                       throw new IllegalArgumentException("Unconsumed bytes in 
the serialized key " +
-                                       "and namespace. This indicates a 
mismatch in the key/namespace " +
-                                       "serializers used by the KvState 
instance and this access.");
-               }
+                       if (dis.available() > 0) {
+                               throw new IOException("Unconsumed bytes in the 
serialized key and namespace.");
+                       }
 
-               return new Tuple2<>(key, namespace);
+                       return new Tuple2<>(key, namespace);
+               } catch (IOException e) {
+                       throw new IOException("Unable to deserialize key " +
+                               "and namespace. This indicates a mismatch in 
the key/namespace " +
+                               "serializers used by the KvState instance and 
this access.", e);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7a0e3d61/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index e8caf57..b1ec86f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -554,7 +554,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
                KvStateRequestFailure response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
                assertEquals(182828, response.getRequestId());
-               
assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+               
assertTrue(response.getCause().getMessage().contains("IOException"));
 
                // Repeat with wrong namespace only
                request = KvStateRequestSerializer.serializeKvStateRequest(
@@ -573,7 +573,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
                response = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
                assertEquals(182829, response.getRequestId());
-               
assertTrue(response.getCause().getMessage().contains("IllegalArgumentException"));
+               
assertTrue(response.getCause().getMessage().contains("IOException"));
 
                assertEquals(2, stats.getNumRequests());
                assertEquals(2, stats.getNumFailed());

http://git-wip-us.apache.org/repos/asf/flink/blob/7a0e3d61/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 9d6d27c..0d9c2e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -220,6 +220,46 @@ public class KvStateRequestSerializerTest {
        }
 
        /**
+        * Tests key and namespace deserialization utils with too few bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
+               KvStateRequestSerializer.deserializeKeyAndNamespace(
+                       new byte[] {}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
+       }
+
+       /**
+        * Tests key and namespace deserialization utils with too few bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testKeyAndNamespaceDeserializationTooShort() throws 
Exception {
+               KvStateRequestSerializer.deserializeKeyAndNamespace(
+                       new byte[] {1}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
+       }
+
+       /**
+        * Tests key and namespace deserialization utils with too many bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testKeyAndNamespaceDeserializationTooMany1() throws 
Exception {
+               // Long + null String + 1 byte
+               KvStateRequestSerializer.deserializeKeyAndNamespace(
+                       new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, 
LongSerializer.INSTANCE,
+                       StringSerializer.INSTANCE);
+       }
+
+       /**
+        * Tests key and namespace deserialization utils with too many bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testKeyAndNamespaceDeserializationTooMany2() throws 
Exception {
+               // Long + null String + 2 bytes
+               KvStateRequestSerializer.deserializeKeyAndNamespace(
+                       new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, 
LongSerializer.INSTANCE,
+                       StringSerializer.INSTANCE);
+       }
+
+       /**
         * Tests value serialization utils.
         */
        @Test

Reply via email to