Repository: flink Updated Branches: refs/heads/release-1.2 d8222c117 -> 7a0e3d610
[FLINK-5576] [queryable state] Improve failure message deserializeList As in FLINK-5559, wrap the original IOException into a new one with an appropriate error message to better diagnose it. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd639819 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd639819 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd639819 Branch: refs/heads/release-1.2 Commit: fd639819cbe030ec3721f6d970ad527b1762e713 Parents: ef13f48 Author: Nico Kruber <[email protected]> Authored: Thu Jan 19 17:10:54 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Sun Jan 22 15:56:22 2017 +0100 ---------------------------------------------------------------------- .../netty/message/KvStateRequestSerializer.java | 40 ++++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd639819/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java index 5c60e59..eb37106 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java @@ -453,24 +453,32 @@ public final class KvStateRequestSerializer { */ public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException { if (serializedValue != null) { - DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length); - - List<T> result = new ArrayList<>(); - while (in.available() > 0) { - result.add(serializer.deserialize(in)); - - // The expected binary format has a single byte separator. We - // want a consistent binary format in order to not need any - // special casing during deserialization. A "cleaner" format - // would skip this extra byte, but would require a memory copy - // for RocksDB, which stores the data serialized in this way - // for lists. - if (in.available() > 0) { - in.readByte(); + final DataInputDeserializer in = new DataInputDeserializer( + serializedValue, 0, serializedValue.length); + + try { + final List<T> result = new ArrayList<>(); + while (in.available() > 0) { + result.add(serializer.deserialize(in)); + + // The expected binary format has a single byte separator. We + // want a consistent binary format in order to not need any + // special casing during deserialization. A "cleaner" format + // would skip this extra byte, but would require a memory copy + // for RocksDB, which stores the data serialized in this way + // for lists. + if (in.available() > 0) { + in.readByte(); + } } - } - return result; + return result; + } catch (IOException e) { + throw new IOException( + "Unable to deserialize value. " + + "This indicates a mismatch in the value serializers " + + "used by the KvState instance and this access.", e); + } } else { return null; }
