[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 
'flink-core'

These core flink utils are independent of any other runtime classes and
are also used both in flink-runtime and in flink-queryable-state (which 
duplicated
the code).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37df826e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37df826e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37df826e

Branch: refs/heads/master
Commit: 37df826e4e1355a2cba89a85ea94257d40785cb2
Parents: 198b74a
Author: Stephan Ewen <[email protected]>
Authored: Thu Nov 2 18:27:23 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Nov 2 19:20:06 2017 +0100

----------------------------------------------------------------------
 ...eInformationKeyValueSerializationSchema.java |   4 +-
 .../core/memory/DataInputDeserializer.java      | 387 ++++++++++++++++++
 .../flink/core/memory/DataOutputSerializer.java | 337 ++++++++++++++++
 .../core/memory/DataInputDeserializerTest.java  |  59 +++
 .../memory/DataInputOutputSerializerTest.java   | 123 ++++++
 .../serialization/types/AsciiStringType.java    |  85 ++++
 .../serialization/types/BooleanType.java        |  74 ++++
 .../serialization/types/ByteArrayType.java      |  83 ++++
 .../serialization/types/ByteSubArrayType.java   |  98 +++++
 .../testutils/serialization/types/ByteType.java |  74 ++++
 .../testutils/serialization/types/CharType.java |  75 ++++
 .../serialization/types/DoubleType.java         |  75 ++++
 .../serialization/types/FloatType.java          |  74 ++++
 .../testutils/serialization/types/IntType.java  |  74 ++++
 .../testutils/serialization/types/LongType.java |  74 ++++
 .../types/SerializationTestType.java            |  32 ++
 .../types/SerializationTestTypeFactory.java     |  47 +++
 .../serialization/types/ShortType.java          |  74 ++++
 .../serialization/types/UnsignedByteType.java   |  74 ++++
 .../serialization/types/UnsignedShortType.java  |  74 ++++
 .../testutils/serialization/types/Util.java     | 106 +++++
 .../serialization/DataInputDeserializer.java    | 392 -------------------
 .../serialization/DataOutputSerializer.java     | 344 ----------------
 .../state/serialization/KvStateSerializer.java  |   2 +
 .../AdaptiveSpanningRecordDeserializer.java     |   4 +-
 .../api/serialization/EventSerializer.java      |   4 +-
 .../serialization/SpanningRecordSerializer.java |   2 +-
 ...llingAdaptiveSpanningRecordDeserializer.java |   2 +-
 .../metrics/dump/MetricDumpSerialization.java   |   4 +-
 .../runtime/util/DataInputDeserializer.java     | 390 ------------------
 .../runtime/util/DataOutputSerializer.java      | 342 ----------------
 .../io/network/api/CheckpointBarrierTest.java   |   4 +-
 .../api/serialization/PagedViewsTest.java       |   6 +-
 .../SpanningRecordSerializationTest.java        |   6 +-
 .../SpanningRecordSerializerTest.java           |   6 +-
 .../serialization/types/AsciiStringType.java    |  85 ----
 .../api/serialization/types/BooleanType.java    |  74 ----
 .../api/serialization/types/ByteArrayType.java  |  83 ----
 .../serialization/types/ByteSubArrayType.java   |  98 -----
 .../api/serialization/types/ByteType.java       |  74 ----
 .../api/serialization/types/CharType.java       |  75 ----
 .../api/serialization/types/DoubleType.java     |  75 ----
 .../api/serialization/types/FloatType.java      |  74 ----
 .../api/serialization/types/IntType.java        |  74 ----
 .../api/serialization/types/LongType.java       |  74 ----
 .../types/SerializationTestType.java            |  32 --
 .../types/SerializationTestTypeFactory.java     |  47 ---
 .../api/serialization/types/ShortType.java      |  74 ----
 .../serialization/types/UnsignedByteType.java   |  74 ----
 .../serialization/types/UnsignedShortType.java  |  74 ----
 .../network/api/serialization/types/Util.java   | 106 -----
 .../network/serialization/LargeRecordsTest.java |   4 +-
 .../serialization/types/LargeObjectType.java    |   2 +-
 .../runtime/util/DataInputDeserializerTest.java |  59 ---
 .../util/DataInputOutputSerializerTest.java     | 122 ------
 .../source/SerializedCheckpointData.java        |   4 +-
 .../TypeInformationSerializationSchema.java     |   4 +-
 .../StreamElementSerializerTest.java            |   4 +-
 58 files changed, 2131 insertions(+), 2872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 3e0cdb5..96b8879 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
new file mode 100644
index 0000000..088f9d2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * A simple and efficient deserializer for the {@link java.io.DataInput} 
interface.
+ */
+public class DataInputDeserializer implements DataInputView, 
java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       // 
------------------------------------------------------------------------
+
+       private byte[] buffer;
+
+       private int end;
+
+       private int position;
+
+       // 
------------------------------------------------------------------------
+
+       public DataInputDeserializer() {}
+
+       public DataInputDeserializer(byte[] buffer) {
+               setBuffer(buffer, 0, buffer.length);
+       }
+
+       public DataInputDeserializer(byte[] buffer, int start, int len) {
+               setBuffer(buffer, start, len);
+       }
+
+       public DataInputDeserializer(ByteBuffer buffer) {
+               setBuffer(buffer);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Changing buffers
+       // 
------------------------------------------------------------------------
+
+       public void setBuffer(ByteBuffer buffer) {
+               if (buffer.hasArray()) {
+                       this.buffer = buffer.array();
+                       this.position = buffer.arrayOffset() + 
buffer.position();
+                       this.end = this.position + buffer.remaining();
+               } else if (buffer.isDirect()) {
+                       this.buffer = new byte[buffer.remaining()];
+                       this.position = 0;
+                       this.end = this.buffer.length;
+
+                       buffer.get(this.buffer);
+               } else {
+                       throw new IllegalArgumentException("The given buffer is 
neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
+               }
+       }
+
+       public void setBuffer(byte[] buffer, int start, int len) {
+               if (buffer == null) {
+                       throw new NullPointerException();
+               }
+
+               if (start < 0 || len < 0 || start + len > buffer.length) {
+                       throw new IllegalArgumentException();
+               }
+
+               this.buffer = buffer;
+               this.position = start;
+               this.end = start + len;
+       }
+
+       public void releaseArrays() {
+               this.buffer = null;
+       }
+
+       // 
----------------------------------------------------------------------------------------
+       //                               Data Input
+       // 
----------------------------------------------------------------------------------------
+
+       public int available() {
+               if (position < end) {
+                       return end - position;
+               } else {
+                       return 0;
+               }
+       }
+
+       @Override
+       public boolean readBoolean() throws IOException {
+               if (this.position < this.end) {
+                       return this.buffer[this.position++] != 0;
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public byte readByte() throws IOException {
+               if (this.position < this.end) {
+                       return this.buffer[this.position++];
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public char readChar() throws IOException {
+               if (this.position < this.end - 1) {
+                       return (char) (((this.buffer[this.position++] & 0xff) 
<< 8) | (this.buffer[this.position++] & 0xff));
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public double readDouble() throws IOException {
+               return Double.longBitsToDouble(readLong());
+       }
+
+       @Override
+       public float readFloat() throws IOException {
+               return Float.intBitsToFloat(readInt());
+       }
+
+       @Override
+       public void readFully(byte[] b) throws IOException {
+               readFully(b, 0, b.length);
+       }
+
+       @Override
+       public void readFully(byte[] b, int off, int len) throws IOException {
+               if (len >= 0) {
+                       if (off <= b.length - len) {
+                               if (this.position <= this.end - len) {
+                                       System.arraycopy(this.buffer, position, 
b, off, len);
+                                       position += len;
+                               } else {
+                                       throw new EOFException();
+                               }
+                       } else {
+                               throw new ArrayIndexOutOfBoundsException();
+                       }
+               } else if (len < 0) {
+                       throw new IllegalArgumentException("Length may not be 
negative.");
+               }
+       }
+
+       @Override
+       public int readInt() throws IOException {
+               if (this.position >= 0 && this.position < this.end - 3) {
+                       @SuppressWarnings("restriction")
+                       int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + 
this.position);
+                       if (LITTLE_ENDIAN) {
+                               value = Integer.reverseBytes(value);
+                       }
+
+                       this.position += 4;
+                       return value;
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public String readLine() throws IOException {
+               if (this.position < this.end) {
+                       // read until a newline is found
+                       StringBuilder bld = new StringBuilder();
+                       char curr = (char) readUnsignedByte();
+                       while (position < this.end && curr != '\n') {
+                               bld.append(curr);
+                               curr = (char) readUnsignedByte();
+                       }
+                       // trim a trailing carriage return
+                       int len = bld.length();
+                       if (len > 0 && bld.charAt(len - 1) == '\r') {
+                               bld.setLength(len - 1);
+                       }
+                       String s = bld.toString();
+                       bld.setLength(0);
+                       return s;
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       public long readLong() throws IOException {
+               if (position >= 0 && position < this.end - 7) {
+                       @SuppressWarnings("restriction")
+                       long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + 
this.position);
+                       if (LITTLE_ENDIAN) {
+                               value = Long.reverseBytes(value);
+                       }
+                       this.position += 8;
+                       return value;
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public short readShort() throws IOException {
+               if (position >= 0 && position < this.end - 1) {
+                       return (short) ((((this.buffer[position++]) & 0xff) << 
8) | ((this.buffer[position++]) & 0xff));
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public String readUTF() throws IOException {
+               int utflen = readUnsignedShort();
+               byte[] bytearr = new byte[utflen];
+               char[] chararr = new char[utflen];
+
+               int c, char2, char3;
+               int count = 0;
+               int chararrCount = 0;
+
+               readFully(bytearr, 0, utflen);
+
+               while (count < utflen) {
+                       c = (int) bytearr[count] & 0xff;
+                       if (c > 127) {
+                               break;
+                       }
+                       count++;
+                       chararr[chararrCount++] = (char) c;
+               }
+
+               while (count < utflen) {
+                       c = (int) bytearr[count] & 0xff;
+                       switch (c >> 4) {
+                       case 0:
+                       case 1:
+                       case 2:
+                       case 3:
+                       case 4:
+                       case 5:
+                       case 6:
+                       case 7:
+                               /* 0xxxxxxx */
+                               count++;
+                               chararr[chararrCount++] = (char) c;
+                               break;
+                       case 12:
+                       case 13:
+                               /* 110x xxxx 10xx xxxx */
+                               count += 2;
+                               if (count > utflen) {
+                                       throw new 
UTFDataFormatException("malformed input: partial character at end");
+                               }
+                               char2 = (int) bytearr[count - 1];
+                               if ((char2 & 0xC0) != 0x80) {
+                                       throw new 
UTFDataFormatException("malformed input around byte " + count);
+                               }
+                               chararr[chararrCount++] = (char) (((c & 0x1F) 
<< 6) | (char2 & 0x3F));
+                               break;
+                       case 14:
+                               /* 1110 xxxx 10xx xxxx 10xx xxxx */
+                               count += 3;
+                               if (count > utflen) {
+                                       throw new 
UTFDataFormatException("malformed input: partial character at end");
+                               }
+                               char2 = (int) bytearr[count - 2];
+                               char3 = (int) bytearr[count - 1];
+                               if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) 
!= 0x80)) {
+                                       throw new 
UTFDataFormatException("malformed input around byte " + (count - 1));
+                               }
+                               chararr[chararrCount++] = (char) (((c & 0x0F) 
<< 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+                               break;
+                       default:
+                               /* 10xx xxxx, 1111 xxxx */
+                               throw new UTFDataFormatException("malformed 
input around byte " + count);
+                       }
+               }
+               // The number of chars produced may be less than utflen
+               return new String(chararr, 0, chararrCount);
+       }
+
+       @Override
+       public int readUnsignedByte() throws IOException {
+               if (this.position < this.end) {
+                       return (this.buffer[this.position++] & 0xff);
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public int readUnsignedShort() throws IOException {
+               if (this.position < this.end - 1) {
+                       return ((this.buffer[this.position++] & 0xff) << 8) | 
(this.buffer[this.position++] & 0xff);
+               } else {
+                       throw new EOFException();
+               }
+       }
+
+       @Override
+       public int skipBytes(int n) throws IOException {
+               if (this.position <= this.end - n) {
+                       this.position += n;
+                       return n;
+               } else {
+                       n = this.end - this.position;
+                       this.position = this.end;
+                       return n;
+               }
+       }
+
+       @Override
+       public void skipBytesToRead(int numBytes) throws IOException {
+               int skippedBytes = skipBytes(numBytes);
+
+               if (skippedBytes < numBytes){
+                       throw new EOFException("Could not skip " + numBytes + " 
bytes.");
+               }
+       }
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               if (b == null){
+                       throw new NullPointerException("Byte array b cannot be 
null.");
+               }
+
+               if (off < 0){
+                       throw new IndexOutOfBoundsException("Offset cannot be 
negative.");
+               }
+
+               if (len < 0){
+                       throw new IndexOutOfBoundsException("Length cannot be 
negative.");
+               }
+
+               if (b.length - off < len){
+                       throw new IndexOutOfBoundsException("Byte array does 
not provide enough space to store requested data" +
+                                       ".");
+               }
+
+               if (this.position >= this.end) {
+                       return -1;
+               } else {
+                       int toRead = Math.min(this.end - this.position, len);
+                       System.arraycopy(this.buffer, this.position, b, off, 
toRead);
+                       this.position += toRead;
+
+                       return toRead;
+               }
+       }
+
+       @Override
+       public int read(byte[] b) throws IOException {
+               return read(b, 0, b.length);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("restriction")
+       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+       @SuppressWarnings("restriction")
+       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+
+       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
new file mode 100644
index 0000000..7b8acb7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+
+/**
+ * A simple and efficient serializer for the {@link java.io.DataOutput} 
interface.
+ */
+public class DataOutputSerializer implements DataOutputView {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DataOutputSerializer.class);
+
+       private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
+
+       // 
------------------------------------------------------------------------
+
+       private final byte[] startBuffer;
+
+       private byte[] buffer;
+
+       private int position;
+
+       private ByteBuffer wrapper;
+
+       // 
------------------------------------------------------------------------
+
+       public DataOutputSerializer(int startSize) {
+               if (startSize < 1) {
+                       throw new IllegalArgumentException();
+               }
+
+               this.startBuffer = new byte[startSize];
+               this.buffer = this.startBuffer;
+               this.wrapper = ByteBuffer.wrap(buffer);
+       }
+
+       public ByteBuffer wrapAsByteBuffer() {
+               this.wrapper.position(0);
+               this.wrapper.limit(this.position);
+               return this.wrapper;
+       }
+
+       public byte[] getByteArray() {
+               return buffer;
+       }
+
+       public byte[] getCopyOfBuffer() {
+               return Arrays.copyOf(buffer, position);
+       }
+
+       public void clear() {
+               this.position = 0;
+       }
+
+       public int length() {
+               return this.position;
+       }
+
+       public void pruneBuffer() {
+               if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Releasing serialization buffer of " 
+ this.buffer.length + " bytes.");
+                       }
+
+                       this.buffer = this.startBuffer;
+                       this.wrapper = ByteBuffer.wrap(this.buffer);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return String.format("[pos=%d cap=%d]", this.position, 
this.buffer.length);
+       }
+
+       // 
----------------------------------------------------------------------------------------
+       //                               Data Output
+       // 
----------------------------------------------------------------------------------------
+
+       @Override
+       public void write(int b) throws IOException {
+               if (this.position >= this.buffer.length) {
+                       resize(1);
+               }
+               this.buffer[this.position++] = (byte) (b & 0xff);
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+               write(b, 0, b.length);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               if (len < 0 || off > b.length - len) {
+                       throw new ArrayIndexOutOfBoundsException();
+               }
+               if (this.position > this.buffer.length - len) {
+                       resize(len);
+               }
+               System.arraycopy(b, off, this.buffer, this.position, len);
+               this.position += len;
+       }
+
+       @Override
+       public void writeBoolean(boolean v) throws IOException {
+               write(v ? 1 : 0);
+       }
+
+       @Override
+       public void writeByte(int v) throws IOException {
+               write(v);
+       }
+
+       @Override
+       public void writeBytes(String s) throws IOException {
+               final int sLen = s.length();
+               if (this.position >= this.buffer.length - sLen) {
+                       resize(sLen);
+               }
+
+               for (int i = 0; i < sLen; i++) {
+                       writeByte(s.charAt(i));
+               }
+               this.position += sLen;
+       }
+
+       @Override
+       public void writeChar(int v) throws IOException {
+               if (this.position >= this.buffer.length - 1) {
+                       resize(2);
+               }
+               this.buffer[this.position++] = (byte) (v >> 8);
+               this.buffer[this.position++] = (byte) v;
+       }
+
+       @Override
+       public void writeChars(String s) throws IOException {
+               final int sLen = s.length();
+               if (this.position >= this.buffer.length - 2 * sLen) {
+                       resize(2 * sLen);
+               }
+               for (int i = 0; i < sLen; i++) {
+                       writeChar(s.charAt(i));
+               }
+       }
+
+       @Override
+       public void writeDouble(double v) throws IOException {
+               writeLong(Double.doubleToLongBits(v));
+       }
+
+       @Override
+       public void writeFloat(float v) throws IOException {
+               writeInt(Float.floatToIntBits(v));
+       }
+
+       @SuppressWarnings("restriction")
+       @Override
+       public void writeInt(int v) throws IOException {
+               if (this.position >= this.buffer.length - 3) {
+                       resize(4);
+               }
+               if (LITTLE_ENDIAN) {
+                       v = Integer.reverseBytes(v);
+               }
+               UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+               this.position += 4;
+       }
+
+       @SuppressWarnings("restriction")
+       @Override
+       public void writeLong(long v) throws IOException {
+               if (this.position >= this.buffer.length - 7) {
+                       resize(8);
+               }
+               if (LITTLE_ENDIAN) {
+                       v = Long.reverseBytes(v);
+               }
+               UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+               this.position += 8;
+       }
+
+       @Override
+       public void writeShort(int v) throws IOException {
+               if (this.position >= this.buffer.length - 1) {
+                       resize(2);
+               }
+               this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+               this.buffer[this.position++] = (byte) (v & 0xff);
+       }
+
+       @Override
+       public void writeUTF(String str) throws IOException {
+               int strlen = str.length();
+               int utflen = 0;
+               int c;
+
+               /* use charAt instead of copying String to char array */
+               for (int i = 0; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if ((c >= 0x0001) && (c <= 0x007F)) {
+                               utflen++;
+                       } else if (c > 0x07FF) {
+                               utflen += 3;
+                       } else {
+                               utflen += 2;
+                       }
+               }
+
+               if (utflen > 65535) {
+                       throw new UTFDataFormatException("Encoded string is too 
long: " + utflen);
+               }
+               else if (this.position > this.buffer.length - utflen - 2) {
+                       resize(utflen + 2);
+               }
+
+               byte[] bytearr = this.buffer;
+               int count = this.position;
+
+               bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+               bytearr[count++] = (byte) (utflen & 0xFF);
+
+               int i;
+               for (i = 0; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if (!((c >= 0x0001) && (c <= 0x007F))) {
+                               break;
+                       }
+                       bytearr[count++] = (byte) c;
+               }
+
+               for (; i < strlen; i++) {
+                       c = str.charAt(i);
+                       if ((c >= 0x0001) && (c <= 0x007F)) {
+                               bytearr[count++] = (byte) c;
+
+                       } else if (c > 0x07FF) {
+                               bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 
0x0F));
+                               bytearr[count++] = (byte) (0x80 | ((c >>  6) & 
0x3F));
+                               bytearr[count++] = (byte) (0x80 | (c & 0x3F));
+                       } else {
+                               bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 
0x1F));
+                               bytearr[count++] = (byte) (0x80 | (c & 0x3F));
+                       }
+               }
+
+               this.position = count;
+       }
+
+       private void resize(int minCapacityAdd) throws IOException {
+               int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
+               byte[] nb;
+               try {
+                       nb = new byte[newLen];
+               }
+               catch (NegativeArraySizeException e) {
+                       throw new IOException("Serialization failed because the 
record length would exceed 2GB (max addressable array size in Java).");
+               }
+               catch (OutOfMemoryError e) {
+                       // this was too large to allocate, try the smaller size 
(if possible)
+                       if (newLen > this.buffer.length + minCapacityAdd) {
+                               newLen = this.buffer.length + minCapacityAdd;
+                               try {
+                                       nb = new byte[newLen];
+                               }
+                               catch (OutOfMemoryError ee) {
+                                       // still not possible. give an 
informative exception message that reports the size
+                                       throw new IOException("Failed to 
serialize element. Serialized size (> "
+                                                       + newLen + " bytes) 
exceeds JVM heap space", ee);
+                               }
+                       } else {
+                               throw new IOException("Failed to serialize 
element. Serialized size (> "
+                                               + newLen + " bytes) exceeds JVM 
heap space", e);
+                       }
+               }
+
+               System.arraycopy(this.buffer, 0, nb, 0, this.position);
+               this.buffer = nb;
+               this.wrapper = ByteBuffer.wrap(this.buffer);
+       }
+
+       @Override
+       public void skipBytesToWrite(int numBytes) throws IOException {
+               if (buffer.length - this.position < numBytes){
+                       throw new EOFException("Could not skip " + numBytes + " 
bytes.");
+               }
+
+               this.position += numBytes;
+       }
+
+       @Override
+       public void write(DataInputView source, int numBytes) throws 
IOException {
+               if (buffer.length - this.position < numBytes){
+                       throw new EOFException("Could not write " + numBytes + 
" bytes. Buffer overflow.");
+               }
+
+               source.readFully(this.buffer, this.position, numBytes);
+               this.position += numBytes;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("restriction")
+       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+       @SuppressWarnings("restriction")
+       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+
+       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java
new file mode 100644
index 0000000..26d407d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Test suite for the {@link DataInputDeserializer} class.
+ */
+public class DataInputDeserializerTest {
+
+       @Test
+       public void testAvailable() throws Exception {
+               byte[] bytes;
+               DataInputDeserializer dis;
+
+               bytes = new byte[] {};
+               dis = new DataInputDeserializer(bytes, 0, bytes.length);
+               Assert.assertEquals(bytes.length, dis.available());
+
+               bytes = new byte[] {1, 2, 3};
+               dis = new DataInputDeserializer(bytes, 0, bytes.length);
+               Assert.assertEquals(bytes.length, dis.available());
+
+               dis.readByte();
+               Assert.assertEquals(2, dis.available());
+               dis.readByte();
+               Assert.assertEquals(1, dis.available());
+               dis.readByte();
+               Assert.assertEquals(0, dis.available());
+
+               try {
+                       dis.readByte();
+                       Assert.fail("Did not throw expected IOException");
+               } catch (IOException e) {
+                       // ignore
+               }
+               Assert.assertEquals(0, dis.available());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
new file mode 100644
index 0000000..02a7ea7
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.testutils.serialization.types.Util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+/**
+ * Tests for the combination of {@link DataOutputSerializer} and {@link 
DataInputDeserializer}.
+ */
+public class DataInputOutputSerializerTest {
+
+       @Test
+       public void testWrapAsByteBuffer() {
+               SerializationTestType randomInt = 
Util.randomRecord(SerializationTestTypeFactory.INT);
+
+               DataOutputSerializer serializer = new 
DataOutputSerializer(randomInt.length());
+               MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(randomInt.length());
+
+               try {
+                       // empty buffer, read buffer should be empty
+                       ByteBuffer wrapper = serializer.wrapAsByteBuffer();
+
+                       Assert.assertEquals(0, wrapper.position());
+                       Assert.assertEquals(0, wrapper.limit());
+
+                       // write to data output, read buffer should still be 
empty
+                       randomInt.write(serializer);
+
+                       Assert.assertEquals(0, wrapper.position());
+                       Assert.assertEquals(0, wrapper.limit());
+
+                       // get updated read buffer, read buffer should contain 
written data
+                       wrapper = serializer.wrapAsByteBuffer();
+
+                       Assert.assertEquals(0, wrapper.position());
+                       Assert.assertEquals(randomInt.length(), 
wrapper.limit());
+
+                       // clear data output, read buffer should still contain 
written data
+                       serializer.clear();
+
+                       Assert.assertEquals(0, wrapper.position());
+                       Assert.assertEquals(randomInt.length(), 
wrapper.limit());
+
+                       // get updated read buffer, should be empty
+                       wrapper = serializer.wrapAsByteBuffer();
+
+                       Assert.assertEquals(0, wrapper.position());
+                       Assert.assertEquals(0, wrapper.limit());
+
+                       // write to data output and read back to memory
+                       randomInt.write(serializer);
+                       wrapper = serializer.wrapAsByteBuffer();
+
+                       segment.put(0, wrapper, randomInt.length());
+
+                       Assert.assertEquals(randomInt.length(), 
wrapper.position());
+                       Assert.assertEquals(randomInt.length(), 
wrapper.limit());
+               } catch (IOException e) {
+                       e.printStackTrace();
+                       Assert.fail("Test encountered an unexpected 
exception.");
+               }
+       }
+
+       @Test
+       public void testRandomValuesWriteRead() {
+               final int numElements = 100000;
+               final ArrayDeque<SerializationTestType> reference = new 
ArrayDeque<>();
+
+               DataOutputSerializer serializer = new DataOutputSerializer(1);
+
+               for (SerializationTestType value : 
Util.randomRecords(numElements)) {
+                       reference.add(value);
+
+                       try {
+                               value.write(serializer);
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                               Assert.fail("Test encountered an unexpected 
exception.");
+                       }
+               }
+
+               DataInputDeserializer deserializer = new 
DataInputDeserializer(serializer.wrapAsByteBuffer());
+
+               for (SerializationTestType expected : reference) {
+                       try {
+                               SerializationTestType actual = 
expected.getClass().newInstance();
+                               actual.read(deserializer);
+
+                               Assert.assertEquals(expected, actual);
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               Assert.fail("Test encountered an unexpected 
exception.");
+                       }
+               }
+
+               reference.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java
new file mode 100644
index 0000000..d845a39
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class AsciiStringType implements SerializationTestType {
+
+       private static final int MAX_LEN = 1500;
+
+       public String value;
+
+       public AsciiStringType() {
+               this.value = "";
+       }
+
+       private AsciiStringType(String value) {
+               this.value = value;
+       }
+
+       @Override
+       public AsciiStringType getRandom(Random rnd) {
+               final StringBuilder bld = new StringBuilder();
+               final int len = rnd.nextInt(MAX_LEN + 1);
+
+               for (int i = 0; i < len; i++) {
+                       // 1--127
+                       bld.append((char) (rnd.nextInt(126) + 1));
+               }
+
+               return new AsciiStringType(bld.toString());
+       }
+
+       @Override
+       public int length() {
+               return value.getBytes(ConfigConstants.DEFAULT_CHARSET).length + 
2;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeUTF(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readUTF();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof AsciiStringType) {
+                       AsciiStringType other = (AsciiStringType) obj;
+                       return this.value.equals(other.value);
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java
new file mode 100644
index 0000000..cb696b1
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class BooleanType implements SerializationTestType {
+
+       private boolean value;
+
+       public BooleanType() {
+               this.value = false;
+       }
+
+       private BooleanType(boolean value) {
+               this.value = value;
+       }
+
+       @Override
+       public BooleanType getRandom(Random rnd) {
+               return new BooleanType(rnd.nextBoolean());
+       }
+
+       @Override
+       public int length() {
+               return 1;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeBoolean(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readBoolean();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value ? 1 : 0;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof BooleanType) {
+                       BooleanType other = (BooleanType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java
new file mode 100644
index 0000000..c33e43d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteArrayType implements SerializationTestType {
+
+       private static final int MAX_LEN = 512 * 15;
+
+       private byte[] data;
+
+       public ByteArrayType() {
+               this.data = new byte[0];
+       }
+
+       public ByteArrayType(byte[] data) {
+               this.data = data;
+       }
+
+       @Override
+       public ByteArrayType getRandom(Random rnd) {
+               final int len = rnd.nextInt(MAX_LEN) + 1;
+               final byte[] data = new byte[len];
+               rnd.nextBytes(data);
+               return new ByteArrayType(data);
+       }
+
+       @Override
+       public int length() {
+               return data.length + 4;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(this.data.length);
+               out.write(this.data);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               final int len = in.readInt();
+               this.data = new byte[len];
+               in.readFully(this.data);
+       }
+
+       @Override
+       public int hashCode() {
+               return Arrays.hashCode(this.data);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ByteArrayType) {
+                       ByteArrayType other = (ByteArrayType) obj;
+                       return Arrays.equals(this.data, other.data);
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java
new file mode 100644
index 0000000..2d5a48a
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteSubArrayType implements SerializationTestType {
+
+       private static final int MAX_LEN = 512;
+
+       private final byte[] data;
+
+       private int len;
+
+       public ByteSubArrayType() {
+               this.data = new byte[MAX_LEN];
+               this.len = 0;
+       }
+
+       @Override
+       public ByteSubArrayType getRandom(Random rnd) {
+               final int len = rnd.nextInt(MAX_LEN) + 1;
+               final ByteSubArrayType t = new ByteSubArrayType();
+               t.len = len;
+
+               final byte[] data = t.data;
+               for (int i = 0; i < len; i++) {
+                       data[i] = (byte) rnd.nextInt(256);
+               }
+
+               return t;
+       }
+
+       @Override
+       public int length() {
+               return len + 4;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(this.len);
+               out.write(this.data, 0, this.len);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.len = in.readInt();
+               in.readFully(this.data, 0, this.len);
+       }
+
+       @Override
+       public int hashCode() {
+               final byte[] copy = new byte[this.len];
+               System.arraycopy(this.data, 0, copy, 0, this.len);
+               return Arrays.hashCode(copy);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ByteSubArrayType) {
+                       ByteSubArrayType other = (ByteSubArrayType) obj;
+                       if (this.len == other.len) {
+                               for (int i = 0; i < this.len; i++) {
+                                       if (this.data[i] != other.data[i]) {
+                                               return false;
+                                       }
+                               }
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java
new file mode 100644
index 0000000..8b843be
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ByteType implements SerializationTestType {
+
+       private byte value;
+
+       public ByteType() {
+               this.value = (byte) 0;
+       }
+
+       private ByteType(byte value) {
+               this.value = value;
+       }
+
+       @Override
+       public ByteType getRandom(Random rnd) {
+               return new ByteType((byte) rnd.nextInt(256));
+       }
+
+       @Override
+       public int length() {
+               return 1;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeByte(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readByte();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ByteType) {
+                       ByteType other = (ByteType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java
new file mode 100644
index 0000000..962de7f
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class CharType implements SerializationTestType {
+
+       private char value;
+
+       public CharType() {
+               this.value = 0;
+       }
+
+       private CharType(char value) {
+               this.value = value;
+       }
+
+       @Override
+       public CharType getRandom(Random rnd) {
+               return new CharType((char) rnd.nextInt(10000));
+       }
+
+       @Override
+       public int length() {
+               return 2;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeChar(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readChar();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof CharType) {
+                       CharType other = (CharType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java
new file mode 100644
index 0000000..7119c34
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class DoubleType implements SerializationTestType {
+
+       private double value;
+
+       public DoubleType() {
+               this.value = 0;
+       }
+
+       private DoubleType(double value) {
+               this.value = value;
+       }
+
+       @Override
+       public DoubleType getRandom(Random rnd) {
+               return new DoubleType(rnd.nextDouble());
+       }
+
+       @Override
+       public int length() {
+               return 8;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeDouble(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readDouble();
+       }
+
+       @Override
+       public int hashCode() {
+               final long l = Double.doubleToLongBits(this.value);
+               return (int) (l ^ l >>> 32);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof DoubleType) {
+                       DoubleType other = (DoubleType) obj;
+                       return Double.doubleToLongBits(this.value) == 
Double.doubleToLongBits(other.value);
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java
new file mode 100644
index 0000000..9fa6e63
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class FloatType implements SerializationTestType {
+
+       private float value;
+
+       public FloatType() {
+               this.value = 0;
+       }
+
+       private FloatType(float value) {
+               this.value = value;
+       }
+
+       @Override
+       public FloatType getRandom(Random rnd) {
+               return new FloatType(rnd.nextFloat());
+       }
+
+       @Override
+       public int length() {
+               return 4;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeFloat(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readFloat();
+       }
+
+       @Override
+       public int hashCode() {
+               return Float.floatToIntBits(this.value);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof FloatType) {
+                       FloatType other = (FloatType) obj;
+                       return Float.floatToIntBits(this.value) == 
Float.floatToIntBits(other.value);
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java
new file mode 100644
index 0000000..52313ab
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class IntType implements SerializationTestType {
+
+       private int value;
+
+       public IntType() {
+               this.value = 0;
+       }
+
+       public IntType(int value) {
+               this.value = value;
+       }
+
+       @Override
+       public IntType getRandom(Random rnd) {
+               return new IntType(rnd.nextInt());
+       }
+
+       @Override
+       public int length() {
+               return 4;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readInt();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof IntType) {
+                       IntType other = (IntType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java
new file mode 100644
index 0000000..3e47b4b
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class LongType implements SerializationTestType {
+
+       private long value;
+
+       public LongType() {
+               this.value = 0;
+       }
+
+       private LongType(long value) {
+               this.value = value;
+       }
+
+       @Override
+       public LongType getRandom(Random rnd) {
+               return new LongType(rnd.nextLong());
+       }
+
+       @Override
+       public int length() {
+               return 8;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeLong(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readLong();
+       }
+
+       @Override
+       public int hashCode() {
+               return (int) (this.value ^ this.value >>> 32);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof LongType) {
+                       LongType other = (LongType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
new file mode 100644
index 0000000..1edd796
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.util.Random;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+public interface SerializationTestType extends IOReadableWritable {
+
+       public SerializationTestType getRandom(Random rnd);
+
+       public int length();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java
new file mode 100644
index 0000000..392e15a
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+public enum SerializationTestTypeFactory {
+       
+       BOOLEAN(new BooleanType()),
+       BYTE_ARRAY(new ByteArrayType()),
+       BYTE_SUB_ARRAY(new ByteSubArrayType()),
+       BYTE(new ByteType()),
+       CHAR(new CharType()),
+       DOUBLE(new DoubleType()),
+       FLOAT(new FloatType()),
+       INT(new IntType()),
+       LONG(new LongType()),
+       SHORT(new ShortType()),
+       UNSIGNED_BYTE(new UnsignedByteType()),
+       UNSIGNED_SHORT(new UnsignedShortType()),
+       STRING(new AsciiStringType());
+
+       private final SerializationTestType factory;
+
+       SerializationTestTypeFactory(SerializationTestType type) {
+               this.factory = type;
+       }
+
+       public SerializationTestType factory() {
+               return this.factory;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java
new file mode 100644
index 0000000..b5b3c27
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class ShortType implements SerializationTestType {
+
+       private short value;
+
+       public ShortType() {
+               this.value = (short) 0;
+       }
+
+       private ShortType(short value) {
+               this.value = value;
+       }
+
+       @Override
+       public ShortType getRandom(Random rnd) {
+               return new ShortType((short) rnd.nextInt(65536));
+       }
+
+       @Override
+       public int length() {
+               return 2;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeShort(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readShort();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof ShortType) {
+                       ShortType other = (ShortType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java
new file mode 100644
index 0000000..29b897a
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class UnsignedByteType implements SerializationTestType {
+
+       private int value;
+
+       public UnsignedByteType() {
+               this.value = 0;
+       }
+
+       private UnsignedByteType(int value) {
+               this.value = value;
+       }
+
+       @Override
+       public UnsignedByteType getRandom(Random rnd) {
+               return new UnsignedByteType(rnd.nextInt(128) + 128);
+       }
+
+       @Override
+       public int length() {
+               return 1;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeByte(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readUnsignedByte();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof UnsignedByteType) {
+                       UnsignedByteType other = (UnsignedByteType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java
new file mode 100644
index 0000000..47d0156
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class UnsignedShortType implements SerializationTestType {
+
+       private int value;
+
+       public UnsignedShortType() {
+               this.value = 0;
+       }
+
+       private UnsignedShortType(int value) {
+               this.value = value;
+       }
+
+       @Override
+       public UnsignedShortType getRandom(Random rnd) {
+               return new UnsignedShortType(rnd.nextInt(32768) + 32768);
+       }
+
+       @Override
+       public int length() {
+               return 2;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeShort(this.value);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               this.value = in.readUnsignedShort();
+       }
+
+       @Override
+       public int hashCode() {
+               return this.value;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof UnsignedShortType) {
+                       UnsignedShortType other = (UnsignedShortType) obj;
+                       return this.value == other.value;
+               } else {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
new file mode 100644
index 0000000..b34701f
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.serialization.types;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+/**
+ * Utility class to help serialization for testing.
+ */
+public final class Util {
+
+       private static final long SEED = 64871654635745873L;
+
+       private static Random random = new Random(SEED);
+
+       public static SerializationTestType 
randomRecord(SerializationTestTypeFactory type) {
+               return type.factory().getRandom(Util.random);
+       }
+
+       public static MockRecords randomRecords(final int numElements, final 
SerializationTestTypeFactory type) {
+
+               return new MockRecords(numElements) {
+                       @Override
+                       protected SerializationTestType getRecord() {
+                               return type.factory().getRandom(Util.random);
+                       }
+               };
+       }
+
+       public static MockRecords randomRecords(final int numElements) {
+
+               return new MockRecords(numElements) {
+                       @Override
+                       protected SerializationTestType getRecord() {
+                               // select random test type factory
+                               SerializationTestTypeFactory[] types = 
SerializationTestTypeFactory.values();
+                               int i = Util.random.nextInt(types.length);
+
+                               return 
types[i].factory().getRandom(Util.random);
+                       }
+               };
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       public abstract static class MockRecords implements 
Iterable<SerializationTestType> {
+
+               private int numRecords;
+
+               public MockRecords(int numRecords) {
+                       this.numRecords = numRecords;
+               }
+
+               @Override
+               public Iterator<SerializationTestType> iterator() {
+                       return new Iterator<SerializationTestType>() {
+                               @Override
+                               public boolean hasNext() {
+                                       return numRecords > 0;
+                               }
+
+                               @Override
+                               public SerializationTestType next() {
+                                       if (numRecords > 0) {
+                                               numRecords--;
+
+                                               return getRecord();
+                                       }
+
+                                       throw new NoSuchElementException();
+                               }
+
+                               @Override
+                               public void remove() {
+                                       throw new 
UnsupportedOperationException();
+                               }
+                       };
+               }
+
+               abstract protected SerializationTestType getRecord();
+       }
+
+       /**
+        * No instantiation.
+        */
+       private Util() {
+               throw new RuntimeException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
deleted file mode 100644
index 878df85..0000000
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client.state.serialization;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemoryUtils;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-/**
- * A simple and efficient deserializer for the {@link java.io.DataInput} 
interface.
- *
- * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
- */
-public class DataInputDeserializer implements DataInputView, 
java.io.Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       // 
------------------------------------------------------------------------
-
-       private byte[] buffer;
-
-       private int end;
-
-       private int position;
-
-       // 
------------------------------------------------------------------------
-
-       public DataInputDeserializer() {}
-
-       public DataInputDeserializer(byte[] buffer) {
-               setBuffer(buffer, 0, buffer.length);
-       }
-
-       public DataInputDeserializer(byte[] buffer, int start, int len) {
-               setBuffer(buffer, start, len);
-       }
-
-       public DataInputDeserializer(ByteBuffer buffer) {
-               setBuffer(buffer);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Changing buffers
-       // 
------------------------------------------------------------------------
-
-       public void setBuffer(ByteBuffer buffer) {
-               if (buffer.hasArray()) {
-                       this.buffer = buffer.array();
-                       this.position = buffer.arrayOffset() + 
buffer.position();
-                       this.end = this.position + buffer.remaining();
-               } else if (buffer.isDirect()) {
-                       this.buffer = new byte[buffer.remaining()];
-                       this.position = 0;
-                       this.end = this.buffer.length;
-
-                       buffer.get(this.buffer);
-               } else {
-                       throw new IllegalArgumentException("The given buffer is 
neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
-               }
-       }
-
-       public void setBuffer(byte[] buffer, int start, int len) {
-               if (buffer == null) {
-                       throw new NullPointerException();
-               }
-
-               if (start < 0 || len < 0 || start + len > buffer.length) {
-                       throw new IllegalArgumentException();
-               }
-
-               this.buffer = buffer;
-               this.position = start;
-               this.end = start + len;
-       }
-
-       public void releaseArrays() {
-               this.buffer = null;
-       }
-
-       // 
----------------------------------------------------------------------------------------
-       //                               Data Input
-       // 
----------------------------------------------------------------------------------------
-
-       public int available() {
-               if (position < end) {
-                       return end - position;
-               } else {
-                       return 0;
-               }
-       }
-
-       @Override
-       public boolean readBoolean() throws IOException {
-               if (this.position < this.end) {
-                       return this.buffer[this.position++] != 0;
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public byte readByte() throws IOException {
-               if (this.position < this.end) {
-                       return this.buffer[this.position++];
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public char readChar() throws IOException {
-               if (this.position < this.end - 1) {
-                       return (char) (((this.buffer[this.position++] & 0xff) 
<< 8) | (this.buffer[this.position++] & 0xff));
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public double readDouble() throws IOException {
-               return Double.longBitsToDouble(readLong());
-       }
-
-       @Override
-       public float readFloat() throws IOException {
-               return Float.intBitsToFloat(readInt());
-       }
-
-       @Override
-       public void readFully(byte[] b) throws IOException {
-               readFully(b, 0, b.length);
-       }
-
-       @Override
-       public void readFully(byte[] b, int off, int len) throws IOException {
-               if (len >= 0) {
-                       if (off <= b.length - len) {
-                               if (this.position <= this.end - len) {
-                                       System.arraycopy(this.buffer, position, 
b, off, len);
-                                       position += len;
-                               } else {
-                                       throw new EOFException();
-                               }
-                       } else {
-                               throw new ArrayIndexOutOfBoundsException();
-                       }
-               } else if (len < 0) {
-                       throw new IllegalArgumentException("Length may not be 
negative.");
-               }
-       }
-
-       @Override
-       public int readInt() throws IOException {
-               if (this.position >= 0 && this.position < this.end - 3) {
-                       @SuppressWarnings("restriction")
-                       int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + 
this.position);
-                       if (LITTLE_ENDIAN) {
-                               value = Integer.reverseBytes(value);
-                       }
-
-                       this.position += 4;
-                       return value;
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public String readLine() throws IOException {
-               if (this.position < this.end) {
-                       // read until a newline is found
-                       StringBuilder bld = new StringBuilder();
-                       char curr = (char) readUnsignedByte();
-                       while (position < this.end && curr != '\n') {
-                               bld.append(curr);
-                               curr = (char) readUnsignedByte();
-                       }
-                       // trim a trailing carriage return
-                       int len = bld.length();
-                       if (len > 0 && bld.charAt(len - 1) == '\r') {
-                               bld.setLength(len - 1);
-                       }
-                       String s = bld.toString();
-                       bld.setLength(0);
-                       return s;
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public long readLong() throws IOException {
-               if (position >= 0 && position < this.end - 7) {
-                       @SuppressWarnings("restriction")
-                       long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + 
this.position);
-                       if (LITTLE_ENDIAN) {
-                               value = Long.reverseBytes(value);
-                       }
-                       this.position += 8;
-                       return value;
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public short readShort() throws IOException {
-               if (position >= 0 && position < this.end - 1) {
-                       return (short) ((((this.buffer[position++]) & 0xff) << 
8) | ((this.buffer[position++]) & 0xff));
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public String readUTF() throws IOException {
-               int utflen = readUnsignedShort();
-               byte[] bytearr = new byte[utflen];
-               char[] chararr = new char[utflen];
-
-               int c, char2, char3;
-               int count = 0;
-               int chararrCount = 0;
-
-               readFully(bytearr, 0, utflen);
-
-               while (count < utflen) {
-                       c = (int) bytearr[count] & 0xff;
-                       if (c > 127) {
-                               break;
-                       }
-                       count++;
-                       chararr[chararrCount++] = (char) c;
-               }
-
-               while (count < utflen) {
-                       c = (int) bytearr[count] & 0xff;
-                       switch (c >> 4) {
-                       case 0:
-                       case 1:
-                       case 2:
-                       case 3:
-                       case 4:
-                       case 5:
-                       case 6:
-                       case 7:
-                               /* 0xxxxxxx */
-                               count++;
-                               chararr[chararrCount++] = (char) c;
-                               break;
-                       case 12:
-                       case 13:
-                               /* 110x xxxx 10xx xxxx */
-                               count += 2;
-                               if (count > utflen) {
-                                       throw new 
UTFDataFormatException("malformed input: partial character at end");
-                               }
-                               char2 = (int) bytearr[count - 1];
-                               if ((char2 & 0xC0) != 0x80) {
-                                       throw new 
UTFDataFormatException("malformed input around byte " + count);
-                               }
-                               chararr[chararrCount++] = (char) (((c & 0x1F) 
<< 6) | (char2 & 0x3F));
-                               break;
-                       case 14:
-                               /* 1110 xxxx 10xx xxxx 10xx xxxx */
-                               count += 3;
-                               if (count > utflen) {
-                                       throw new 
UTFDataFormatException("malformed input: partial character at end");
-                               }
-                               char2 = (int) bytearr[count - 2];
-                               char3 = (int) bytearr[count - 1];
-                               if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) 
!= 0x80)) {
-                                       throw new 
UTFDataFormatException("malformed input around byte " + (count - 1));
-                               }
-                               chararr[chararrCount++] = (char) (((c & 0x0F) 
<< 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
-                               break;
-                       default:
-                               /* 10xx xxxx, 1111 xxxx */
-                               throw new UTFDataFormatException("malformed 
input around byte " + count);
-                       }
-               }
-               // The number of chars produced may be less than utflen
-               return new String(chararr, 0, chararrCount);
-       }
-
-       @Override
-       public int readUnsignedByte() throws IOException {
-               if (this.position < this.end) {
-                       return (this.buffer[this.position++] & 0xff);
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public int readUnsignedShort() throws IOException {
-               if (this.position < this.end - 1) {
-                       return ((this.buffer[this.position++] & 0xff) << 8) | 
(this.buffer[this.position++] & 0xff);
-               } else {
-                       throw new EOFException();
-               }
-       }
-
-       @Override
-       public int skipBytes(int n) throws IOException {
-               if (this.position <= this.end - n) {
-                       this.position += n;
-                       return n;
-               } else {
-                       n = this.end - this.position;
-                       this.position = this.end;
-                       return n;
-               }
-       }
-
-       @Override
-       public void skipBytesToRead(int numBytes) throws IOException {
-               int skippedBytes = skipBytes(numBytes);
-
-               if (skippedBytes < numBytes){
-                       throw new EOFException("Could not skip " + numBytes + " 
bytes.");
-               }
-       }
-
-       @Override
-       public int read(byte[] b, int off, int len) throws IOException {
-               if (b == null){
-                       throw new NullPointerException("Byte array b cannot be 
null.");
-               }
-
-               if (off < 0){
-                       throw new IndexOutOfBoundsException("Offset cannot be 
negative.");
-               }
-
-               if (len < 0){
-                       throw new IndexOutOfBoundsException("Length cannot be 
negative.");
-               }
-
-               if (b.length - off < len){
-                       throw new IndexOutOfBoundsException("Byte array does 
not provide enough space to store requested data" +
-                                       ".");
-               }
-
-               if (this.position >= this.end) {
-                       return -1;
-               } else {
-                       int toRead = Math.min(this.end - this.position, len);
-                       System.arraycopy(this.buffer, this.position, b, off, 
toRead);
-                       this.position += toRead;
-
-                       return toRead;
-               }
-       }
-
-       @Override
-       public int read(byte[] b) throws IOException {
-               return read(b, 0, b.length);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       @SuppressWarnings("restriction")
-       private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
-       @SuppressWarnings("restriction")
-       private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
-
-       private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
-}

Reply via email to