Repository: flink
Updated Branches:
  refs/heads/master 67b380d61 -> 92efcd34a


[hotfix] Remove 'ByteArrayInputView' and replace deserialization in 
TypeInformationSerializationSchema with more efficient reusable buffers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92efcd34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92efcd34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92efcd34

Branch: refs/heads/master
Commit: 92efcd34a5da2bccb07666f2c647974ea3e7c94f
Parents: 67b380d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 1 14:39:24 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 1 17:29:02 2016 +0100

----------------------------------------------------------------------
 .../typeutils/runtime/ByteArrayInputView.java   | 40 ----------------
 .../runtime/kryo/KryoClearedBufferTest.java     |  8 +++-
 .../runtime/util/DataInputDeserializer.java     | 48 ++++++++++++--------
 .../runtime/util/DataOutputSerializer.java      | 25 ++++++----
 ...eInformationKeyValueSerializationSchema.java | 44 +++++++++++++-----
 .../connectors/kafka/KafkaConsumerTestBase.java | 17 +++++--
 .../TypeInformationSerializationSchema.java     | 14 ++++--
 .../TypeInformationSerializationSchemaTest.java |  2 +-
 8 files changed, 109 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
deleted file mode 100644
index 48d6a3d..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-public class ByteArrayInputView extends DataInputStream implements 
DataInputView {
-
-       public ByteArrayInputView(byte[] data) {
-               super(new ByteArrayInputStream(data));
-       }
-
-       @Override
-       public void skipBytesToRead(int numBytes) throws IOException {
-               while (numBytes > 0) {
-                       int skipped = skipBytes(numBytes);
-                       numBytes -= skipped;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index ab2e45f..7572408 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -22,13 +22,16 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
@@ -69,7 +72,8 @@ public class KryoClearedBufferTest {
                        // now the Kryo Output should have been cleared
                }
 
-               TestRecord actualRecord = kryoSerializer.deserialize(new 
ByteArrayInputView(target.getBuffer()));
+               TestRecord actualRecord = kryoSerializer.deserialize(
+                               new DataInputViewStreamWrapper(new 
ByteArrayInputStream(target.getBuffer())));
 
                Assert.assertEquals(testRecord, actualRecord);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index e8e8f6d..bdccdd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.EOFException;
@@ -31,7 +30,11 @@ import org.apache.flink.core.memory.MemoryUtils;
 /**
  * A simple and efficient deserializer for the {@link java.io.DataInput} 
interface.
  */
-public class DataInputDeserializer implements DataInputView {
+public class DataInputDeserializer implements DataInputView, 
java.io.Serializable {
+       
+       private static final long serialVersionUID = 1L;
+
+       // 
------------------------------------------------------------------------
        
        private byte[] buffer;
        
@@ -39,8 +42,9 @@ public class DataInputDeserializer implements DataInputView {
 
        private int position;
 
-       public DataInputDeserializer() {
-       }
+       // 
------------------------------------------------------------------------
+       
+       public DataInputDeserializer() {}
        
        public DataInputDeserializer(byte[] buffer, int start, int len) {
                setBuffer(buffer, start, len);
@@ -50,6 +54,10 @@ public class DataInputDeserializer implements DataInputView {
                setBuffer(buffer);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Chaning buffers
+       // 
------------------------------------------------------------------------
+       
        public void setBuffer(ByteBuffer buffer) {
                if (buffer.hasArray()) {
                        this.buffer = buffer.array();
@@ -311,44 +319,36 @@ public class DataInputDeserializer implements 
DataInputView {
                        return n;
                }
        }
-       
-       @SuppressWarnings("restriction")
-       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-       
-       @SuppressWarnings("restriction")
-       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
-       
-       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 
        @Override
        public void skipBytesToRead(int numBytes) throws IOException {
                int skippedBytes = skipBytes(numBytes);
 
-               if(skippedBytes < numBytes){
+               if (skippedBytes < numBytes){
                        throw new EOFException("Could not skip " + numBytes +" 
bytes.");
                }
        }
 
        @Override
        public int read(byte[] b, int off, int len) throws IOException {
-               if(b == null){
+               if (b == null){
                        throw new NullPointerException("Byte array b cannot be 
null.");
                }
 
-               if(off < 0){
+               if (off < 0){
                        throw new IndexOutOfBoundsException("Offset cannot be 
negative.");
                }
 
-               if(len < 0){
+               if (len < 0){
                        throw new IndexOutOfBoundsException("Length cannot be 
negative.");
                }
 
-               if(b.length - off < len){
+               if (b.length - off < len){
                        throw new IndexOutOfBoundsException("Byte array does 
not provide enough space to store requested data" +
                                        ".");
                }
 
-               if(this.position >= this.end) {
+               if (this.position >= this.end) {
                        return -1;
                } else {
                        int toRead = Math.min(this.end-this.position, len);
@@ -363,4 +363,16 @@ public class DataInputDeserializer implements 
DataInputView {
        public int read(byte[] b) throws IOException {
                return read(b, 0, b.length);
        }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("restriction")
+       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+       @SuppressWarnings("restriction")
+       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+
+       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 0e93544..18940ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemoryUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,8 @@ public class DataOutputSerializer implements DataOutputView {
        private static final Logger LOG = 
LoggerFactory.getLogger(DataOutputSerializer.class);
        
        private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
+
+       // 
------------------------------------------------------------------------
        
        private final byte[] startBuffer;
        
@@ -47,6 +50,8 @@ public class DataOutputSerializer implements DataOutputView {
        private int position;
 
        private ByteBuffer wrapper;
+
+       // 
------------------------------------------------------------------------
        
        public DataOutputSerializer(int startSize) {
                if (startSize < 1) {
@@ -303,14 +308,6 @@ public class DataOutputSerializer implements 
DataOutputView {
                this.buffer = nb;
                this.wrapper = ByteBuffer.wrap(this.buffer);
        }
-       
-       @SuppressWarnings("restriction")
-       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-       
-       @SuppressWarnings("restriction")
-       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
-       
-       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 
        @Override
        public void skipBytesToWrite(int numBytes) throws IOException {
@@ -330,4 +327,16 @@ public class DataOutputSerializer implements 
DataOutputView {
                source.read(this.buffer, this.position, numBytes);
                this.position += numBytes;
        }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("restriction")
+       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+       @SuppressWarnings("restriction")
+       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+
+       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index a35c01e..a3f8ba1 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
 import java.io.IOException;
@@ -46,10 +46,16 @@ public class TypeInformationKeyValueSerializationSchema<K, 
V> implements KeyedDe
        /** The serializer for the value */
        private final TypeSerializer<V> valueSerializer;
 
-       /** reusable output serialization buffers */
+       /** reusable input deserialization buffer */
+       private final DataInputDeserializer inputDeserializer;
+       
+       /** reusable output serialization buffer for the key */
        private transient DataOutputSerializer keyOutputSerializer;
-       private transient DataOutputSerializer valueOutputSerializer;
 
+       /** reusable output serialization buffer for the value */
+       private transient DataOutputSerializer valueOutputSerializer;
+       
+       
        /** The type information, to be returned by {@link #getProducedType()}. 
It is
         * transient, because it is not serializable. Note that this means that 
the type information
         * is not available at runtime, but only prior to the first 
serialization / deserialization */
@@ -68,11 +74,22 @@ public class TypeInformationKeyValueSerializationSchema<K, 
V> implements KeyedDe
                this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
                this.keySerializer = keyTypeInfo.createSerializer(ec);
                this.valueSerializer = valueTypeInfo.createSerializer(ec);
+               this.inputDeserializer = new DataInputDeserializer();
        }
 
+       /**
+        * Creates a new de-/serialization schema for the given types. This 
constructor accepts the types
+        * as classes and internally constructs the type information from the 
classes.
+        * 
+        * <p>If the types are parametrized and cannot be fully defined via 
classes, use the constructor
+        * that accepts {@link TypeInformation} instead.
+        * 
+        * @param keyClass The class of the key de-/serialized by this schema.
+        * @param valueClass The class of the value de-/serialized by this 
schema.
+        * @param config The execution config, which is used to parametrize the 
type serializers.
+        */
        public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, 
Class<V> valueClass, ExecutionConfig config) {
-               //noinspection unchecked
-               this( (TypeInformation<K>) 
TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) 
TypeExtractor.createTypeInfo(valueClass), config);
+               this(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass), config);
        }
 
        // 
------------------------------------------------------------------------
@@ -81,12 +98,15 @@ public class TypeInformationKeyValueSerializationSchema<K, 
V> implements KeyedDe
        @Override
        public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, 
String topic, int partition, long offset) throws IOException {
                K key = null;
-               if(messageKey != null) {
-                       key = keySerializer.deserialize(new 
ByteArrayInputView(messageKey));
-               }
                V value = null;
-               if(message != null) {
-                       value = valueSerializer.deserialize(new 
ByteArrayInputView(message));
+               
+               if (messageKey != null) {
+                       inputDeserializer.setBuffer(messageKey, 0, 
messageKey.length);
+                       key = keySerializer.deserialize(inputDeserializer);
+               }
+               if (message != null) {
+                       inputDeserializer.setBuffer(message, 0, message.length);
+                       value = valueSerializer.deserialize(inputDeserializer);
                }
                return new Tuple2<>(key, value);
        }
@@ -104,7 +124,7 @@ public class TypeInformationKeyValueSerializationSchema<K, 
V> implements KeyedDe
 
        @Override
        public byte[] serializeKey(Tuple2<K, V> element) {
-               if(element.f0 == null) {
+               if (element.f0 == null) {
                        return null;
                } else {
                        // key is not null. serialize it:
@@ -132,7 +152,7 @@ public class TypeInformationKeyValueSerializationSchema<K, 
V> implements KeyedDe
        @Override
        public byte[] serializeValue(Tuple2<K, V> element) {
                // if the value is null, its serialized value is null as well.
-               if(element.f1 == null) {
+               if (element.f1 == null) {
                        return null;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8592182..3d39869 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -39,9 +39,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobExecutionException;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -82,6 +83,7 @@ import org.junit.Assert;
 
 import org.junit.Rule;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -734,14 +736,16 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
        private static class Tuple2WithTopicDeserializationSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> {
 
-               TypeSerializer ts;
+               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+               
                public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) 
{
-                       ts = TypeInfoParser.parse("Tuple2<Integer, 
Integer>").createSerializer(ec);
+                       ts = TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
                }
 
                @Override
                public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
-                       Tuple2<Integer, Integer> t2 = (Tuple2<Integer, 
Integer>) ts.deserialize(new ByteArrayInputView(message));
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       Tuple2<Integer, Integer> t2 = ts.deserialize(in);
                        return new Tuple3<>(t2.f0, t2.f1, topic);
                }
 
@@ -1103,8 +1107,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        }
 
        public static class FixedNumberDeserializationSchema implements 
DeserializationSchema<Tuple2<Integer, Integer>> {
+               
                final int finalCount;
                int count = 0;
+               
                TypeInformation<Tuple2<Integer, Integer>> ti = 
TypeInfoParser.parse("Tuple2<Integer, Integer>");
                TypeSerializer<Tuple2<Integer, Integer>> ser = 
ti.createSerializer(new ExecutionConfig());
 
@@ -1114,7 +1120,8 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
                @Override
                public Tuple2<Integer, Integer> deserialize(byte[] message) 
throws IOException {
-                       return ser.deserialize(new ByteArrayInputView(message));
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       return ser.deserialize(in);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index 6577be8..61876e4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.util.serialization;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
 import java.io.IOException;
@@ -29,7 +29,6 @@ import java.io.IOException;
 /**
  * A serialization and deserialization schema that uses Flink's serialization 
stack to
  * transform typed from and to byte arrays.
- *
  * 
  * @param <T> The type to be serialized.
  */
@@ -42,6 +41,9 @@ public class TypeInformationSerializationSchema<T> implements 
DeserializationSch
 
        /** The reusable output serialization buffer */
        private transient DataOutputSerializer dos;
+       
+       /** The reusable input deserialization buffer */
+       private transient DataInputDeserializer dis;
 
        /** The type information, to be returned by {@link #getProducedType()}. 
It is
         * transient, because it is not serializable. Note that this means that 
the type information
@@ -65,8 +67,14 @@ public class TypeInformationSerializationSchema<T> 
implements DeserializationSch
        
        @Override
        public T deserialize(byte[] message) {
+               if (dis != null) {
+                       dis.setBuffer(message, 0, message.length);
+               } else {
+                       dis = new DataInputDeserializer(message, 0, 
message.length);
+               }
+               
                try {
-                       return serializer.deserialize(new 
ByteArrayInputView(message));
+                       return serializer.deserialize(dis);
                }
                catch (IOException e) {
                        throw new RuntimeException("Unable to deserialize 
message", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
index 1c0f850..e722f53 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -112,7 +112,7 @@ public class TypeInformationSerializationSchemaTest {
 
                @Override
                public String toString() {
-                       return String.format("MyPOJO " + aField + " " + aList);
+                       return "MyPOJO " + aField + " " + aList;
                }
        }
 }

Reply via email to