Repository: flink Updated Branches: refs/heads/master d16552dbe -> 21742b2d7
[FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue KvStateRequestSerializer#deserializeValue deserializes a given byte array. This is used by clients and unit tests and it is fair to assume that these byte arrays represent a complete value since we do not offer a method to continue reading from the middle of the array anyway. Therefore, we can treat unconsumed bytes as errors, e.g. from a wrong serializer being used, and throw a IOException with an appropriate failure message. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1c6ef1e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1c6ef1e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1c6ef1e Branch: refs/heads/master Commit: c1c6ef1e7346cc5b3dfb6e8b7ae3e782b407b8c7 Parents: d16552d Author: Nico Kruber <[email protected]> Authored: Thu Jan 19 17:06:00 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Sun Jan 22 11:55:44 2017 +0100 ---------------------------------------------------------------------- .../netty/message/KvStateRequestSerializer.java | 12 +++++-- .../message/KvStateRequestSerializerTest.java | 38 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java index 6c8b4a5..5c60e59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java @@ -428,8 +428,16 @@ public final class KvStateRequestSerializer { if (serializedValue == null) { return null; } else { - DataInputDeserializer deser = new DataInputDeserializer(serializedValue, 0, serializedValue.length); - return serializer.deserialize(deser); + final DataInputDeserializer deser = new DataInputDeserializer( + serializedValue, 0, serializedValue.length); + final T value = serializer.deserialize(deser); + if (deser.available() > 0) { + throw new IOException( + "Unconsumed bytes in the deserialized value. " + + "This indicates a mismatch in the value serializers " + + "used by the KvState instance and this access."); + } + return value; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index a9aa416..9552531 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -233,6 +234,43 @@ public class KvStateRequestSerializerTest { } /** + * Tests value deserialization with too few bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueEmpty() throws Exception { + KvStateRequestSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE); + } + + /** + * Tests value deserialization with too few bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueTooShort() throws Exception { + // 1 byte (incomplete Long) + KvStateRequestSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE); + } + + /** + * Tests value deserialization with too many bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueTooMany1() throws Exception { + // Long + 1 byte + KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2}, + LongSerializer.INSTANCE); + } + + /** + * Tests value deserialization with too many bytes. + */ + @Test(expected = IOException.class) + public void testDeserializeValueTooMany2() throws Exception { + // Long + 2 bytes + KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2}, + LongSerializer.INSTANCE); + } + + /** * Tests list serialization utils. */ @Test
