Repository: flink
Updated Branches:
  refs/heads/master d16552dbe -> 21742b2d7


[FLINK-5576] [queryable state] Check unconsumed bytes in deserializeValue

KvStateRequestSerializer#deserializeValue deserializes a given byte array. This
is used by clients and unit tests and it is fair to assume that these byte 
arrays
represent a complete value since we do not offer a method to continue reading
from the middle of the array anyway. Therefore, we can treat unconsumed bytes
as errors, e.g. from a wrong serializer being used, and throw a IOException
with an appropriate failure message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1c6ef1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1c6ef1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1c6ef1e

Branch: refs/heads/master
Commit: c1c6ef1e7346cc5b3dfb6e8b7ae3e782b407b8c7
Parents: d16552d
Author: Nico Kruber <[email protected]>
Authored: Thu Jan 19 17:06:00 2017 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Sun Jan 22 11:55:44 2017 +0100

----------------------------------------------------------------------
 .../netty/message/KvStateRequestSerializer.java | 12 +++++--
 .../message/KvStateRequestSerializerTest.java   | 38 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 6c8b4a5..5c60e59 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -428,8 +428,16 @@ public final class KvStateRequestSerializer {
                if (serializedValue == null) {
                        return null;
                } else {
-                       DataInputDeserializer deser = new 
DataInputDeserializer(serializedValue, 0, serializedValue.length);
-                       return serializer.deserialize(deser);
+                       final DataInputDeserializer deser = new 
DataInputDeserializer(
+                               serializedValue, 0, serializedValue.length);
+                       final T value = serializer.deserialize(deser);
+                       if (deser.available() > 0) {
+                               throw new IOException(
+                                       "Unconsumed bytes in the deserialized 
value. " +
+                                               "This indicates a mismatch in 
the value serializers " +
+                                               "used by the KvState instance 
and this access.");
+                       }
+                       return value;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c1c6ef1e/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index a9aa416..9552531 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -233,6 +234,43 @@ public class KvStateRequestSerializerTest {
        }
 
        /**
+        * Tests value deserialization with too few bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testDeserializeValueEmpty() throws Exception {
+               KvStateRequestSerializer.deserializeValue(new byte[] {}, 
LongSerializer.INSTANCE);
+       }
+
+       /**
+        * Tests value deserialization with too few bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testDeserializeValueTooShort() throws Exception {
+               // 1 byte (incomplete Long)
+               KvStateRequestSerializer.deserializeValue(new byte[] {1}, 
LongSerializer.INSTANCE);
+       }
+
+       /**
+        * Tests value deserialization with too many bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testDeserializeValueTooMany1() throws Exception {
+               // Long + 1 byte
+               KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2},
+                       LongSerializer.INSTANCE);
+       }
+
+       /**
+        * Tests value deserialization with too many bytes.
+        */
+       @Test(expected = IOException.class)
+       public void testDeserializeValueTooMany2() throws Exception {
+               // Long + 2 bytes
+               KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2, 2},
+                       LongSerializer.INSTANCE);
+       }
+
+       /**
         * Tests list serialization utils.
         */
        @Test

Reply via email to