[FLINK-5559] [queryable state] Throw proper IOException deserializeKeyAndNamespace
This adds the hint that a deserialisation failure probably results from a "mismatch in the key/namespace serializers used by the KvState instance and this access" to all thrown exceptions. This closes #3172. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21742b2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21742b2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21742b2d Branch: refs/heads/master Commit: 21742b2d77c0a6bb254f27e954f95efdab009539 Parents: 563c3a4 Author: Nico Kruber <[email protected]> Authored: Wed Jan 18 18:31:20 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Sun Jan 22 12:00:14 2017 +0100 ---------------------------------------------------------------------- .../netty/message/KvStateRequestSerializer.java | 33 ++++++++-------- .../query/netty/KvStateServerHandlerTest.java | 4 +- .../message/KvStateRequestSerializerTest.java | 40 ++++++++++++++++++++ 3 files changed, 59 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java index eb37106..2f32861 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java @@ -363,8 +363,7 @@ public final class KvStateRequestSerializer { * @param <K> Key type * @param <N> Namespace * @return Tuple2 holding deserialized key and namespace - * @throws IOException Serialization errors are forwarded - * @throws IllegalStateException If unexpected magic number between key and namespace + * @throws IOException if the deserialization fails for any reason */ public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace( byte[] serializedKeyAndNamespace, @@ -376,22 +375,24 @@ public final class KvStateRequestSerializer { 0, serializedKeyAndNamespace.length); - K key = keySerializer.deserialize(dis); - byte magicNumber = dis.readByte(); - if (magicNumber != 42) { - throw new IllegalArgumentException("Unexpected magic number " + magicNumber + - ". This indicates a mismatch in the key serializers used by the " + - "KvState instance and this access."); - } - N namespace = namespaceSerializer.deserialize(dis); + try { + K key = keySerializer.deserialize(dis); + byte magicNumber = dis.readByte(); + if (magicNumber != 42) { + throw new IOException("Unexpected magic number " + magicNumber + "."); + } + N namespace = namespaceSerializer.deserialize(dis); - if (dis.available() > 0) { - throw new IllegalArgumentException("Unconsumed bytes in the serialized key " + - "and namespace. This indicates a mismatch in the key/namespace " + - "serializers used by the KvState instance and this access."); - } + if (dis.available() > 0) { + throw new IOException("Unconsumed bytes in the serialized key and namespace."); + } - return new Tuple2<>(key, namespace); + return new Tuple2<>(key, namespace); + } catch (IOException e) { + throw new IOException("Unable to deserialize key " + + "and namespace. This indicates a mismatch in the key/namespace " + + "serializers used by the KvState instance and this access.", e); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java index e8caf57..b1ec86f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java @@ -554,7 +554,7 @@ public class KvStateServerHandlerTest extends TestLogger { assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); assertEquals(182828, response.getRequestId()); - assertTrue(response.getCause().getMessage().contains("IllegalArgumentException")); + assertTrue(response.getCause().getMessage().contains("IOException")); // Repeat with wrong namespace only request = KvStateRequestSerializer.serializeKvStateRequest( @@ -573,7 +573,7 @@ public class KvStateServerHandlerTest extends TestLogger { assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); assertEquals(182829, response.getRequestId()); - assertTrue(response.getCause().getMessage().contains("IllegalArgumentException")); + assertTrue(response.getCause().getMessage().contains("IOException")); assertEquals(2, stats.getNumRequests()); assertEquals(2, stats.getNumFailed()); http://git-wip-us.apache.org/repos/asf/flink/blob/21742b2d/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index 9d6d27c..0d9c2e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -220,6 +220,46 @@ public class KvStateRequestSerializerTest { } /** + * Tests key and namespace deserialization utils with too few bytes. + */ + @Test(expected = IOException.class) + public void testKeyAndNamespaceDeserializationEmpty() throws Exception { + KvStateRequestSerializer.deserializeKeyAndNamespace( + new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); + } + + /** + * Tests key and namespace deserialization utils with too few bytes. + */ + @Test(expected = IOException.class) + public void testKeyAndNamespaceDeserializationTooShort() throws Exception { + KvStateRequestSerializer.deserializeKeyAndNamespace( + new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); + } + + /** + * Tests key and namespace deserialization utils with too many bytes. + */ + @Test(expected = IOException.class) + public void testKeyAndNamespaceDeserializationTooMany1() throws Exception { + // Long + null String + 1 byte + KvStateRequestSerializer.deserializeKeyAndNamespace( + new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, LongSerializer.INSTANCE, + StringSerializer.INSTANCE); + } + + /** + * Tests key and namespace deserialization utils with too many bytes. + */ + @Test(expected = IOException.class) + public void testKeyAndNamespaceDeserializationTooMany2() throws Exception { + // Long + null String + 2 bytes + KvStateRequestSerializer.deserializeKeyAndNamespace( + new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, LongSerializer.INSTANCE, + StringSerializer.INSTANCE); + } + + /** * Tests value serialization utils. */ @Test
