[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 'flink-core'
These core flink utils are independent of any other runtime classes and are also used both in flink-runtime and in flink-queryable-state (which duplicated the code). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37df826e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37df826e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37df826e Branch: refs/heads/master Commit: 37df826e4e1355a2cba89a85ea94257d40785cb2 Parents: 198b74a Author: Stephan Ewen <[email protected]> Authored: Thu Nov 2 18:27:23 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Nov 2 19:20:06 2017 +0100 ---------------------------------------------------------------------- ...eInformationKeyValueSerializationSchema.java | 4 +- .../core/memory/DataInputDeserializer.java | 387 ++++++++++++++++++ .../flink/core/memory/DataOutputSerializer.java | 337 ++++++++++++++++ .../core/memory/DataInputDeserializerTest.java | 59 +++ .../memory/DataInputOutputSerializerTest.java | 123 ++++++ .../serialization/types/AsciiStringType.java | 85 ++++ .../serialization/types/BooleanType.java | 74 ++++ .../serialization/types/ByteArrayType.java | 83 ++++ .../serialization/types/ByteSubArrayType.java | 98 +++++ .../testutils/serialization/types/ByteType.java | 74 ++++ .../testutils/serialization/types/CharType.java | 75 ++++ .../serialization/types/DoubleType.java | 75 ++++ .../serialization/types/FloatType.java | 74 ++++ .../testutils/serialization/types/IntType.java | 74 ++++ .../testutils/serialization/types/LongType.java | 74 ++++ .../types/SerializationTestType.java | 32 ++ .../types/SerializationTestTypeFactory.java | 47 +++ .../serialization/types/ShortType.java | 74 ++++ .../serialization/types/UnsignedByteType.java | 74 ++++ .../serialization/types/UnsignedShortType.java | 74 ++++ .../testutils/serialization/types/Util.java | 106 +++++ .../serialization/DataInputDeserializer.java | 392 ------------------- .../serialization/DataOutputSerializer.java | 344 ---------------- .../state/serialization/KvStateSerializer.java | 2 + .../AdaptiveSpanningRecordDeserializer.java | 4 +- .../api/serialization/EventSerializer.java | 4 +- .../serialization/SpanningRecordSerializer.java | 2 +- ...llingAdaptiveSpanningRecordDeserializer.java | 2 +- .../metrics/dump/MetricDumpSerialization.java | 4 +- .../runtime/util/DataInputDeserializer.java | 390 ------------------ .../runtime/util/DataOutputSerializer.java | 342 ---------------- .../io/network/api/CheckpointBarrierTest.java | 4 +- .../api/serialization/PagedViewsTest.java | 6 +- .../SpanningRecordSerializationTest.java | 6 +- .../SpanningRecordSerializerTest.java | 6 +- .../serialization/types/AsciiStringType.java | 85 ---- .../api/serialization/types/BooleanType.java | 74 ---- .../api/serialization/types/ByteArrayType.java | 83 ---- .../serialization/types/ByteSubArrayType.java | 98 ----- .../api/serialization/types/ByteType.java | 74 ---- .../api/serialization/types/CharType.java | 75 ---- .../api/serialization/types/DoubleType.java | 75 ---- .../api/serialization/types/FloatType.java | 74 ---- .../api/serialization/types/IntType.java | 74 ---- .../api/serialization/types/LongType.java | 74 ---- .../types/SerializationTestType.java | 32 -- .../types/SerializationTestTypeFactory.java | 47 --- .../api/serialization/types/ShortType.java | 74 ---- .../serialization/types/UnsignedByteType.java | 74 ---- .../serialization/types/UnsignedShortType.java | 74 ---- .../network/api/serialization/types/Util.java | 106 ----- .../network/serialization/LargeRecordsTest.java | 4 +- .../serialization/types/LargeObjectType.java | 2 +- .../runtime/util/DataInputDeserializerTest.java | 59 --- .../util/DataInputOutputSerializerTest.java | 122 ------ .../source/SerializedCheckpointData.java | 4 +- .../TypeInformationSerializationSchema.java | 4 +- .../StreamElementSerializerTest.java | 4 +- 58 files changed, 2131 insertions(+), 2872 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java index 3e0cdb5..96b8879 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -24,8 +24,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java new file mode 100644 index 0000000..088f9d2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * A simple and efficient deserializer for the {@link java.io.DataInput} interface. + */ +public class DataInputDeserializer implements DataInputView, java.io.Serializable { + + private static final long serialVersionUID = 1L; + + // ------------------------------------------------------------------------ + + private byte[] buffer; + + private int end; + + private int position; + + // ------------------------------------------------------------------------ + + public DataInputDeserializer() {} + + public DataInputDeserializer(byte[] buffer) { + setBuffer(buffer, 0, buffer.length); + } + + public DataInputDeserializer(byte[] buffer, int start, int len) { + setBuffer(buffer, start, len); + } + + public DataInputDeserializer(ByteBuffer buffer) { + setBuffer(buffer); + } + + // ------------------------------------------------------------------------ + // Changing buffers + // ------------------------------------------------------------------------ + + public void setBuffer(ByteBuffer buffer) { + if (buffer.hasArray()) { + this.buffer = buffer.array(); + this.position = buffer.arrayOffset() + buffer.position(); + this.end = this.position + buffer.remaining(); + } else if (buffer.isDirect()) { + this.buffer = new byte[buffer.remaining()]; + this.position = 0; + this.end = this.buffer.length; + + buffer.get(this.buffer); + } else { + throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer."); + } + } + + public void setBuffer(byte[] buffer, int start, int len) { + if (buffer == null) { + throw new NullPointerException(); + } + + if (start < 0 || len < 0 || start + len > buffer.length) { + throw new IllegalArgumentException(); + } + + this.buffer = buffer; + this.position = start; + this.end = start + len; + } + + public void releaseArrays() { + this.buffer = null; + } + + // ---------------------------------------------------------------------------------------- + // Data Input + // ---------------------------------------------------------------------------------------- + + public int available() { + if (position < end) { + return end - position; + } else { + return 0; + } + } + + @Override + public boolean readBoolean() throws IOException { + if (this.position < this.end) { + return this.buffer[this.position++] != 0; + } else { + throw new EOFException(); + } + } + + @Override + public byte readByte() throws IOException { + if (this.position < this.end) { + return this.buffer[this.position++]; + } else { + throw new EOFException(); + } + } + + @Override + public char readChar() throws IOException { + if (this.position < this.end - 1) { + return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff)); + } else { + throw new EOFException(); + } + } + + @Override + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + @Override + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + @Override + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + if (len >= 0) { + if (off <= b.length - len) { + if (this.position <= this.end - len) { + System.arraycopy(this.buffer, position, b, off, len); + position += len; + } else { + throw new EOFException(); + } + } else { + throw new ArrayIndexOutOfBoundsException(); + } + } else if (len < 0) { + throw new IllegalArgumentException("Length may not be negative."); + } + } + + @Override + public int readInt() throws IOException { + if (this.position >= 0 && this.position < this.end - 3) { + @SuppressWarnings("restriction") + int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position); + if (LITTLE_ENDIAN) { + value = Integer.reverseBytes(value); + } + + this.position += 4; + return value; + } else { + throw new EOFException(); + } + } + + @Override + public String readLine() throws IOException { + if (this.position < this.end) { + // read until a newline is found + StringBuilder bld = new StringBuilder(); + char curr = (char) readUnsignedByte(); + while (position < this.end && curr != '\n') { + bld.append(curr); + curr = (char) readUnsignedByte(); + } + // trim a trailing carriage return + int len = bld.length(); + if (len > 0 && bld.charAt(len - 1) == '\r') { + bld.setLength(len - 1); + } + String s = bld.toString(); + bld.setLength(0); + return s; + } else { + return null; + } + } + + @Override + public long readLong() throws IOException { + if (position >= 0 && position < this.end - 7) { + @SuppressWarnings("restriction") + long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position); + if (LITTLE_ENDIAN) { + value = Long.reverseBytes(value); + } + this.position += 8; + return value; + } else { + throw new EOFException(); + } + } + + @Override + public short readShort() throws IOException { + if (position >= 0 && position < this.end - 1) { + return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff)); + } else { + throw new EOFException(); + } + } + + @Override + public String readUTF() throws IOException { + int utflen = readUnsignedShort(); + byte[] bytearr = new byte[utflen]; + char[] chararr = new char[utflen]; + + int c, char2, char3; + int count = 0; + int chararrCount = 0; + + readFully(bytearr, 0, utflen); + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) { + break; + } + count++; + chararr[chararrCount++] = (char) c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + chararr[chararrCount++] = (char) c; + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx */ + count += 2; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException("malformed input around byte " + count); + } + chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 2]; + char3 = (int) bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException("malformed input around byte " + (count - 1)); + } + chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException("malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararrCount); + } + + @Override + public int readUnsignedByte() throws IOException { + if (this.position < this.end) { + return (this.buffer[this.position++] & 0xff); + } else { + throw new EOFException(); + } + } + + @Override + public int readUnsignedShort() throws IOException { + if (this.position < this.end - 1) { + return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff); + } else { + throw new EOFException(); + } + } + + @Override + public int skipBytes(int n) throws IOException { + if (this.position <= this.end - n) { + this.position += n; + return n; + } else { + n = this.end - this.position; + this.position = this.end; + return n; + } + } + + @Override + public void skipBytesToRead(int numBytes) throws IOException { + int skippedBytes = skipBytes(numBytes); + + if (skippedBytes < numBytes){ + throw new EOFException("Could not skip " + numBytes + " bytes."); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null){ + throw new NullPointerException("Byte array b cannot be null."); + } + + if (off < 0){ + throw new IndexOutOfBoundsException("Offset cannot be negative."); + } + + if (len < 0){ + throw new IndexOutOfBoundsException("Length cannot be negative."); + } + + if (b.length - off < len){ + throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" + + "."); + } + + if (this.position >= this.end) { + return -1; + } else { + int toRead = Math.min(this.end - this.position, len); + System.arraycopy(this.buffer, this.position, b, off, toRead); + this.position += toRead; + + return toRead; + } + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java new file mode 100644 index 0000000..7b8acb7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +/** + * A simple and efficient serializer for the {@link java.io.DataOutput} interface. + */ +public class DataOutputSerializer implements DataOutputView { + + private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class); + + private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024; + + // ------------------------------------------------------------------------ + + private final byte[] startBuffer; + + private byte[] buffer; + + private int position; + + private ByteBuffer wrapper; + + // ------------------------------------------------------------------------ + + public DataOutputSerializer(int startSize) { + if (startSize < 1) { + throw new IllegalArgumentException(); + } + + this.startBuffer = new byte[startSize]; + this.buffer = this.startBuffer; + this.wrapper = ByteBuffer.wrap(buffer); + } + + public ByteBuffer wrapAsByteBuffer() { + this.wrapper.position(0); + this.wrapper.limit(this.position); + return this.wrapper; + } + + public byte[] getByteArray() { + return buffer; + } + + public byte[] getCopyOfBuffer() { + return Arrays.copyOf(buffer, position); + } + + public void clear() { + this.position = 0; + } + + public int length() { + return this.position; + } + + public void pruneBuffer() { + if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) { + if (LOG.isDebugEnabled()) { + LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes."); + } + + this.buffer = this.startBuffer; + this.wrapper = ByteBuffer.wrap(this.buffer); + } + } + + @Override + public String toString() { + return String.format("[pos=%d cap=%d]", this.position, this.buffer.length); + } + + // ---------------------------------------------------------------------------------------- + // Data Output + // ---------------------------------------------------------------------------------------- + + @Override + public void write(int b) throws IOException { + if (this.position >= this.buffer.length) { + resize(1); + } + this.buffer[this.position++] = (byte) (b & 0xff); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + if (this.position > this.buffer.length - len) { + resize(len); + } + System.arraycopy(b, off, this.buffer, this.position, len); + this.position += len; + } + + @Override + public void writeBoolean(boolean v) throws IOException { + write(v ? 1 : 0); + } + + @Override + public void writeByte(int v) throws IOException { + write(v); + } + + @Override + public void writeBytes(String s) throws IOException { + final int sLen = s.length(); + if (this.position >= this.buffer.length - sLen) { + resize(sLen); + } + + for (int i = 0; i < sLen; i++) { + writeByte(s.charAt(i)); + } + this.position += sLen; + } + + @Override + public void writeChar(int v) throws IOException { + if (this.position >= this.buffer.length - 1) { + resize(2); + } + this.buffer[this.position++] = (byte) (v >> 8); + this.buffer[this.position++] = (byte) v; + } + + @Override + public void writeChars(String s) throws IOException { + final int sLen = s.length(); + if (this.position >= this.buffer.length - 2 * sLen) { + resize(2 * sLen); + } + for (int i = 0; i < sLen; i++) { + writeChar(s.charAt(i)); + } + } + + @Override + public void writeDouble(double v) throws IOException { + writeLong(Double.doubleToLongBits(v)); + } + + @Override + public void writeFloat(float v) throws IOException { + writeInt(Float.floatToIntBits(v)); + } + + @SuppressWarnings("restriction") + @Override + public void writeInt(int v) throws IOException { + if (this.position >= this.buffer.length - 3) { + resize(4); + } + if (LITTLE_ENDIAN) { + v = Integer.reverseBytes(v); + } + UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v); + this.position += 4; + } + + @SuppressWarnings("restriction") + @Override + public void writeLong(long v) throws IOException { + if (this.position >= this.buffer.length - 7) { + resize(8); + } + if (LITTLE_ENDIAN) { + v = Long.reverseBytes(v); + } + UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v); + this.position += 8; + } + + @Override + public void writeShort(int v) throws IOException { + if (this.position >= this.buffer.length - 1) { + resize(2); + } + this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff); + this.buffer[this.position++] = (byte) (v & 0xff); + } + + @Override + public void writeUTF(String str) throws IOException { + int strlen = str.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) { + throw new UTFDataFormatException("Encoded string is too long: " + utflen); + } + else if (this.position > this.buffer.length - utflen - 2) { + resize(utflen + 2); + } + + byte[] bytearr = this.buffer; + int count = this.position; + + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) (utflen & 0xFF); + + int i; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + bytearr[count++] = (byte) c; + } + + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | (c & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | (c & 0x3F)); + } + } + + this.position = count; + } + + private void resize(int minCapacityAdd) throws IOException { + int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); + byte[] nb; + try { + nb = new byte[newLen]; + } + catch (NegativeArraySizeException e) { + throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); + } + catch (OutOfMemoryError e) { + // this was too large to allocate, try the smaller size (if possible) + if (newLen > this.buffer.length + minCapacityAdd) { + newLen = this.buffer.length + minCapacityAdd; + try { + nb = new byte[newLen]; + } + catch (OutOfMemoryError ee) { + // still not possible. give an informative exception message that reports the size + throw new IOException("Failed to serialize element. Serialized size (> " + + newLen + " bytes) exceeds JVM heap space", ee); + } + } else { + throw new IOException("Failed to serialize element. Serialized size (> " + + newLen + " bytes) exceeds JVM heap space", e); + } + } + + System.arraycopy(this.buffer, 0, nb, 0, this.position); + this.buffer = nb; + this.wrapper = ByteBuffer.wrap(this.buffer); + } + + @Override + public void skipBytesToWrite(int numBytes) throws IOException { + if (buffer.length - this.position < numBytes){ + throw new EOFException("Could not skip " + numBytes + " bytes."); + } + + this.position += numBytes; + } + + @Override + public void write(DataInputView source, int numBytes) throws IOException { + if (buffer.length - this.position < numBytes){ + throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); + } + + source.readFully(this.buffer, this.position, numBytes); + this.position += numBytes; + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + @SuppressWarnings("restriction") + private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java new file mode 100644 index 0000000..26d407d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputDeserializerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test suite for the {@link DataInputDeserializer} class. + */ +public class DataInputDeserializerTest { + + @Test + public void testAvailable() throws Exception { + byte[] bytes; + DataInputDeserializer dis; + + bytes = new byte[] {}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + bytes = new byte[] {1, 2, 3}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + dis.readByte(); + Assert.assertEquals(2, dis.available()); + dis.readByte(); + Assert.assertEquals(1, dis.available()); + dis.readByte(); + Assert.assertEquals(0, dis.available()); + + try { + dis.readByte(); + Assert.fail("Did not throw expected IOException"); + } catch (IOException e) { + // ignore + } + Assert.assertEquals(0, dis.available()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java new file mode 100644 index 0000000..02a7ea7 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import org.apache.flink.testutils.serialization.types.SerializationTestType; +import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; +import org.apache.flink.testutils.serialization.types.Util; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; + +/** + * Tests for the combination of {@link DataOutputSerializer} and {@link DataInputDeserializer}. + */ +public class DataInputOutputSerializerTest { + + @Test + public void testWrapAsByteBuffer() { + SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT); + + DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length()); + MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(randomInt.length()); + + try { + // empty buffer, read buffer should be empty + ByteBuffer wrapper = serializer.wrapAsByteBuffer(); + + Assert.assertEquals(0, wrapper.position()); + Assert.assertEquals(0, wrapper.limit()); + + // write to data output, read buffer should still be empty + randomInt.write(serializer); + + Assert.assertEquals(0, wrapper.position()); + Assert.assertEquals(0, wrapper.limit()); + + // get updated read buffer, read buffer should contain written data + wrapper = serializer.wrapAsByteBuffer(); + + Assert.assertEquals(0, wrapper.position()); + Assert.assertEquals(randomInt.length(), wrapper.limit()); + + // clear data output, read buffer should still contain written data + serializer.clear(); + + Assert.assertEquals(0, wrapper.position()); + Assert.assertEquals(randomInt.length(), wrapper.limit()); + + // get updated read buffer, should be empty + wrapper = serializer.wrapAsByteBuffer(); + + Assert.assertEquals(0, wrapper.position()); + Assert.assertEquals(0, wrapper.limit()); + + // write to data output and read back to memory + randomInt.write(serializer); + wrapper = serializer.wrapAsByteBuffer(); + + segment.put(0, wrapper, randomInt.length()); + + Assert.assertEquals(randomInt.length(), wrapper.position()); + Assert.assertEquals(randomInt.length(), wrapper.limit()); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail("Test encountered an unexpected exception."); + } + } + + @Test + public void testRandomValuesWriteRead() { + final int numElements = 100000; + final ArrayDeque<SerializationTestType> reference = new ArrayDeque<>(); + + DataOutputSerializer serializer = new DataOutputSerializer(1); + + for (SerializationTestType value : Util.randomRecords(numElements)) { + reference.add(value); + + try { + value.write(serializer); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail("Test encountered an unexpected exception."); + } + } + + DataInputDeserializer deserializer = new DataInputDeserializer(serializer.wrapAsByteBuffer()); + + for (SerializationTestType expected : reference) { + try { + SerializationTestType actual = expected.getClass().newInstance(); + actual.read(deserializer); + + Assert.assertEquals(expected, actual); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Test encountered an unexpected exception."); + } + } + + reference.clear(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java new file mode 100644 index 0000000..d845a39 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/AsciiStringType.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.Random; + +public class AsciiStringType implements SerializationTestType { + + private static final int MAX_LEN = 1500; + + public String value; + + public AsciiStringType() { + this.value = ""; + } + + private AsciiStringType(String value) { + this.value = value; + } + + @Override + public AsciiStringType getRandom(Random rnd) { + final StringBuilder bld = new StringBuilder(); + final int len = rnd.nextInt(MAX_LEN + 1); + + for (int i = 0; i < len; i++) { + // 1--127 + bld.append((char) (rnd.nextInt(126) + 1)); + } + + return new AsciiStringType(bld.toString()); + } + + @Override + public int length() { + return value.getBytes(ConfigConstants.DEFAULT_CHARSET).length + 2; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeUTF(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readUTF(); + } + + @Override + public int hashCode() { + return this.value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof AsciiStringType) { + AsciiStringType other = (AsciiStringType) obj; + return this.value.equals(other.value); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java new file mode 100644 index 0000000..cb696b1 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/BooleanType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class BooleanType implements SerializationTestType { + + private boolean value; + + public BooleanType() { + this.value = false; + } + + private BooleanType(boolean value) { + this.value = value; + } + + @Override + public BooleanType getRandom(Random rnd) { + return new BooleanType(rnd.nextBoolean()); + } + + @Override + public int length() { + return 1; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeBoolean(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readBoolean(); + } + + @Override + public int hashCode() { + return this.value ? 1 : 0; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BooleanType) { + BooleanType other = (BooleanType) obj; + return this.value == other.value; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java new file mode 100644 index 0000000..c33e43d --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteArrayType.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class ByteArrayType implements SerializationTestType { + + private static final int MAX_LEN = 512 * 15; + + private byte[] data; + + public ByteArrayType() { + this.data = new byte[0]; + } + + public ByteArrayType(byte[] data) { + this.data = data; + } + + @Override + public ByteArrayType getRandom(Random rnd) { + final int len = rnd.nextInt(MAX_LEN) + 1; + final byte[] data = new byte[len]; + rnd.nextBytes(data); + return new ByteArrayType(data); + } + + @Override + public int length() { + return data.length + 4; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.data.length); + out.write(this.data); + } + + @Override + public void read(DataInputView in) throws IOException { + final int len = in.readInt(); + this.data = new byte[len]; + in.readFully(this.data); + } + + @Override + public int hashCode() { + return Arrays.hashCode(this.data); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ByteArrayType) { + ByteArrayType other = (ByteArrayType) obj; + return Arrays.equals(this.data, other.data); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java new file mode 100644 index 0000000..2d5a48a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteSubArrayType.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class ByteSubArrayType implements SerializationTestType { + + private static final int MAX_LEN = 512; + + private final byte[] data; + + private int len; + + public ByteSubArrayType() { + this.data = new byte[MAX_LEN]; + this.len = 0; + } + + @Override + public ByteSubArrayType getRandom(Random rnd) { + final int len = rnd.nextInt(MAX_LEN) + 1; + final ByteSubArrayType t = new ByteSubArrayType(); + t.len = len; + + final byte[] data = t.data; + for (int i = 0; i < len; i++) { + data[i] = (byte) rnd.nextInt(256); + } + + return t; + } + + @Override + public int length() { + return len + 4; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.len); + out.write(this.data, 0, this.len); + } + + @Override + public void read(DataInputView in) throws IOException { + this.len = in.readInt(); + in.readFully(this.data, 0, this.len); + } + + @Override + public int hashCode() { + final byte[] copy = new byte[this.len]; + System.arraycopy(this.data, 0, copy, 0, this.len); + return Arrays.hashCode(copy); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ByteSubArrayType) { + ByteSubArrayType other = (ByteSubArrayType) obj; + if (this.len == other.len) { + for (int i = 0; i < this.len; i++) { + if (this.data[i] != other.data[i]) { + return false; + } + } + return true; + } else { + return false; + } + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java new file mode 100644 index 0000000..8b843be --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ByteType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class ByteType implements SerializationTestType { + + private byte value; + + public ByteType() { + this.value = (byte) 0; + } + + private ByteType(byte value) { + this.value = value; + } + + @Override + public ByteType getRandom(Random rnd) { + return new ByteType((byte) rnd.nextInt(256)); + } + + @Override + public int length() { + return 1; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeByte(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readByte(); + } + + @Override + public int hashCode() { + return this.value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ByteType) { + ByteType other = (ByteType) obj; + return this.value == other.value; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java new file mode 100644 index 0000000..962de7f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/CharType.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class CharType implements SerializationTestType { + + private char value; + + public CharType() { + this.value = 0; + } + + private CharType(char value) { + this.value = value; + } + + @Override + public CharType getRandom(Random rnd) { + return new CharType((char) rnd.nextInt(10000)); + } + + @Override + public int length() { + return 2; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeChar(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readChar(); + } + + @Override + public int hashCode() { + return this.value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof CharType) { + CharType other = (CharType) obj; + return this.value == other.value; + } else { + return false; + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java new file mode 100644 index 0000000..7119c34 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/DoubleType.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class DoubleType implements SerializationTestType { + + private double value; + + public DoubleType() { + this.value = 0; + } + + private DoubleType(double value) { + this.value = value; + } + + @Override + public DoubleType getRandom(Random rnd) { + return new DoubleType(rnd.nextDouble()); + } + + @Override + public int length() { + return 8; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeDouble(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readDouble(); + } + + @Override + public int hashCode() { + final long l = Double.doubleToLongBits(this.value); + return (int) (l ^ l >>> 32); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DoubleType) { + DoubleType other = (DoubleType) obj; + return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java new file mode 100644 index 0000000..9fa6e63 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/FloatType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class FloatType implements SerializationTestType { + + private float value; + + public FloatType() { + this.value = 0; + } + + private FloatType(float value) { + this.value = value; + } + + @Override + public FloatType getRandom(Random rnd) { + return new FloatType(rnd.nextFloat()); + } + + @Override + public int length() { + return 4; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeFloat(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readFloat(); + } + + @Override + public int hashCode() { + return Float.floatToIntBits(this.value); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FloatType) { + FloatType other = (FloatType) obj; + return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value); + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java new file mode 100644 index 0000000..52313ab --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/IntType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class IntType implements SerializationTestType { + + private int value; + + public IntType() { + this.value = 0; + } + + public IntType(int value) { + this.value = value; + } + + @Override + public IntType getRandom(Random rnd) { + return new IntType(rnd.nextInt()); + } + + @Override + public int length() { + return 4; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readInt(); + } + + @Override + public int hashCode() { + return this.value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof IntType) { + IntType other = (IntType) obj; + return this.value == other.value; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java new file mode 100644 index 0000000..3e47b4b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/LongType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class LongType implements SerializationTestType { + + private long value; + + public LongType() { + this.value = 0; + } + + private LongType(long value) { + this.value = value; + } + + @Override + public LongType getRandom(Random rnd) { + return new LongType(rnd.nextLong()); + } + + @Override + public int length() { + return 8; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeLong(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readLong(); + } + + @Override + public int hashCode() { + return (int) (this.value ^ this.value >>> 32); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof LongType) { + LongType other = (LongType) obj; + return this.value == other.value; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java new file mode 100644 index 0000000..1edd796 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.util.Random; + +import org.apache.flink.core.io.IOReadableWritable; + +public interface SerializationTestType extends IOReadableWritable { + + public SerializationTestType getRandom(Random rnd); + + public int length(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java new file mode 100644 index 0000000..392e15a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/SerializationTestTypeFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +public enum SerializationTestTypeFactory { + + BOOLEAN(new BooleanType()), + BYTE_ARRAY(new ByteArrayType()), + BYTE_SUB_ARRAY(new ByteSubArrayType()), + BYTE(new ByteType()), + CHAR(new CharType()), + DOUBLE(new DoubleType()), + FLOAT(new FloatType()), + INT(new IntType()), + LONG(new LongType()), + SHORT(new ShortType()), + UNSIGNED_BYTE(new UnsignedByteType()), + UNSIGNED_SHORT(new UnsignedShortType()), + STRING(new AsciiStringType()); + + private final SerializationTestType factory; + + SerializationTestTypeFactory(SerializationTestType type) { + this.factory = type; + } + + public SerializationTestType factory() { + return this.factory; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java new file mode 100644 index 0000000..b5b3c27 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/ShortType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class ShortType implements SerializationTestType { + + private short value; + + public ShortType() { + this.value = (short) 0; + } + + private ShortType(short value) { + this.value = value; + } + + @Override + public ShortType getRandom(Random rnd) { + return new ShortType((short) rnd.nextInt(65536)); + } + + @Override + public int length() { + return 2; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeShort(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readShort(); + } + + @Override + public int hashCode() { + return this.value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ShortType) { + ShortType other = (ShortType) obj; + return this.value == other.value; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java new file mode 100644 index 0000000..29b897a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedByteType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class UnsignedByteType implements SerializationTestType { + + private int value; + + public UnsignedByteType() { + this.value = 0; + } + + private UnsignedByteType(int value) { + this.value = value; + } + + @Override + public UnsignedByteType getRandom(Random rnd) { + return new UnsignedByteType(rnd.nextInt(128) + 128); + } + + @Override + public int length() { + return 1; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeByte(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readUnsignedByte(); + } + + @Override + public int hashCode() { + return this.value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof UnsignedByteType) { + UnsignedByteType other = (UnsignedByteType) obj; + return this.value == other.value; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java new file mode 100644 index 0000000..47d0156 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/UnsignedShortType.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.testutils.serialization.types; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +public class UnsignedShortType implements SerializationTestType { + + private int value; + + public UnsignedShortType() { + this.value = 0; + } + + private UnsignedShortType(int value) { + this.value = value; + } + + @Override + public UnsignedShortType getRandom(Random rnd) { + return new UnsignedShortType(rnd.nextInt(32768) + 32768); + } + + @Override + public int length() { + return 2; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeShort(this.value); + } + + @Override + public void read(DataInputView in) throws IOException { + this.value = in.readUnsignedShort(); + } + + @Override + public int hashCode() { + return this.value; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof UnsignedShortType) { + UnsignedShortType other = (UnsignedShortType) obj; + return this.value == other.value; + } else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java new file mode 100644 index 0000000..b34701f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/testutils/serialization/types/Util.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.testutils.serialization.types; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Random; + +/** + * Utility class to help serialization for testing. + */ +public final class Util { + + private static final long SEED = 64871654635745873L; + + private static Random random = new Random(SEED); + + public static SerializationTestType randomRecord(SerializationTestTypeFactory type) { + return type.factory().getRandom(Util.random); + } + + public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) { + + return new MockRecords(numElements) { + @Override + protected SerializationTestType getRecord() { + return type.factory().getRandom(Util.random); + } + }; + } + + public static MockRecords randomRecords(final int numElements) { + + return new MockRecords(numElements) { + @Override + protected SerializationTestType getRecord() { + // select random test type factory + SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values(); + int i = Util.random.nextInt(types.length); + + return types[i].factory().getRandom(Util.random); + } + }; + } + + // ----------------------------------------------------------------------------------------------------------------- + public abstract static class MockRecords implements Iterable<SerializationTestType> { + + private int numRecords; + + public MockRecords(int numRecords) { + this.numRecords = numRecords; + } + + @Override + public Iterator<SerializationTestType> iterator() { + return new Iterator<SerializationTestType>() { + @Override + public boolean hasNext() { + return numRecords > 0; + } + + @Override + public SerializationTestType next() { + if (numRecords > 0) { + numRecords--; + + return getRecord(); + } + + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + abstract protected SerializationTestType getRecord(); + } + + /** + * No instantiation. + */ + private Util() { + throw new RuntimeException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java deleted file mode 100644 index 878df85..0000000 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state.serialization; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.MemoryUtils; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -/** - * A simple and efficient deserializer for the {@link java.io.DataInput} interface. - * - * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b> - */ -public class DataInputDeserializer implements DataInputView, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - // ------------------------------------------------------------------------ - - private byte[] buffer; - - private int end; - - private int position; - - // ------------------------------------------------------------------------ - - public DataInputDeserializer() {} - - public DataInputDeserializer(byte[] buffer) { - setBuffer(buffer, 0, buffer.length); - } - - public DataInputDeserializer(byte[] buffer, int start, int len) { - setBuffer(buffer, start, len); - } - - public DataInputDeserializer(ByteBuffer buffer) { - setBuffer(buffer); - } - - // ------------------------------------------------------------------------ - // Changing buffers - // ------------------------------------------------------------------------ - - public void setBuffer(ByteBuffer buffer) { - if (buffer.hasArray()) { - this.buffer = buffer.array(); - this.position = buffer.arrayOffset() + buffer.position(); - this.end = this.position + buffer.remaining(); - } else if (buffer.isDirect()) { - this.buffer = new byte[buffer.remaining()]; - this.position = 0; - this.end = this.buffer.length; - - buffer.get(this.buffer); - } else { - throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer."); - } - } - - public void setBuffer(byte[] buffer, int start, int len) { - if (buffer == null) { - throw new NullPointerException(); - } - - if (start < 0 || len < 0 || start + len > buffer.length) { - throw new IllegalArgumentException(); - } - - this.buffer = buffer; - this.position = start; - this.end = start + len; - } - - public void releaseArrays() { - this.buffer = null; - } - - // ---------------------------------------------------------------------------------------- - // Data Input - // ---------------------------------------------------------------------------------------- - - public int available() { - if (position < end) { - return end - position; - } else { - return 0; - } - } - - @Override - public boolean readBoolean() throws IOException { - if (this.position < this.end) { - return this.buffer[this.position++] != 0; - } else { - throw new EOFException(); - } - } - - @Override - public byte readByte() throws IOException { - if (this.position < this.end) { - return this.buffer[this.position++]; - } else { - throw new EOFException(); - } - } - - @Override - public char readChar() throws IOException { - if (this.position < this.end - 1) { - return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff)); - } else { - throw new EOFException(); - } - } - - @Override - public double readDouble() throws IOException { - return Double.longBitsToDouble(readLong()); - } - - @Override - public float readFloat() throws IOException { - return Float.intBitsToFloat(readInt()); - } - - @Override - public void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - if (len >= 0) { - if (off <= b.length - len) { - if (this.position <= this.end - len) { - System.arraycopy(this.buffer, position, b, off, len); - position += len; - } else { - throw new EOFException(); - } - } else { - throw new ArrayIndexOutOfBoundsException(); - } - } else if (len < 0) { - throw new IllegalArgumentException("Length may not be negative."); - } - } - - @Override - public int readInt() throws IOException { - if (this.position >= 0 && this.position < this.end - 3) { - @SuppressWarnings("restriction") - int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position); - if (LITTLE_ENDIAN) { - value = Integer.reverseBytes(value); - } - - this.position += 4; - return value; - } else { - throw new EOFException(); - } - } - - @Override - public String readLine() throws IOException { - if (this.position < this.end) { - // read until a newline is found - StringBuilder bld = new StringBuilder(); - char curr = (char) readUnsignedByte(); - while (position < this.end && curr != '\n') { - bld.append(curr); - curr = (char) readUnsignedByte(); - } - // trim a trailing carriage return - int len = bld.length(); - if (len > 0 && bld.charAt(len - 1) == '\r') { - bld.setLength(len - 1); - } - String s = bld.toString(); - bld.setLength(0); - return s; - } else { - return null; - } - } - - @Override - public long readLong() throws IOException { - if (position >= 0 && position < this.end - 7) { - @SuppressWarnings("restriction") - long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position); - if (LITTLE_ENDIAN) { - value = Long.reverseBytes(value); - } - this.position += 8; - return value; - } else { - throw new EOFException(); - } - } - - @Override - public short readShort() throws IOException { - if (position >= 0 && position < this.end - 1) { - return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff)); - } else { - throw new EOFException(); - } - } - - @Override - public String readUTF() throws IOException { - int utflen = readUnsignedShort(); - byte[] bytearr = new byte[utflen]; - char[] chararr = new char[utflen]; - - int c, char2, char3; - int count = 0; - int chararrCount = 0; - - readFully(bytearr, 0, utflen); - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - if (c > 127) { - break; - } - count++; - chararr[chararrCount++] = (char) c; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - switch (c >> 4) { - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - /* 0xxxxxxx */ - count++; - chararr[chararrCount++] = (char) c; - break; - case 12: - case 13: - /* 110x xxxx 10xx xxxx */ - count += 2; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 1]; - if ((char2 & 0xC0) != 0x80) { - throw new UTFDataFormatException("malformed input around byte " + count); - } - chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); - break; - case 14: - /* 1110 xxxx 10xx xxxx 10xx xxxx */ - count += 3; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 2]; - char3 = (int) bytearr[count - 1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { - throw new UTFDataFormatException("malformed input around byte " + (count - 1)); - } - chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); - break; - default: - /* 10xx xxxx, 1111 xxxx */ - throw new UTFDataFormatException("malformed input around byte " + count); - } - } - // The number of chars produced may be less than utflen - return new String(chararr, 0, chararrCount); - } - - @Override - public int readUnsignedByte() throws IOException { - if (this.position < this.end) { - return (this.buffer[this.position++] & 0xff); - } else { - throw new EOFException(); - } - } - - @Override - public int readUnsignedShort() throws IOException { - if (this.position < this.end - 1) { - return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff); - } else { - throw new EOFException(); - } - } - - @Override - public int skipBytes(int n) throws IOException { - if (this.position <= this.end - n) { - this.position += n; - return n; - } else { - n = this.end - this.position; - this.position = this.end; - return n; - } - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - int skippedBytes = skipBytes(numBytes); - - if (skippedBytes < numBytes){ - throw new EOFException("Could not skip " + numBytes + " bytes."); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null){ - throw new NullPointerException("Byte array b cannot be null."); - } - - if (off < 0){ - throw new IndexOutOfBoundsException("Offset cannot be negative."); - } - - if (len < 0){ - throw new IndexOutOfBoundsException("Length cannot be negative."); - } - - if (b.length - off < len){ - throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" + - "."); - } - - if (this.position >= this.end) { - return -1; - } else { - int toRead = Math.min(this.end - this.position, len); - System.arraycopy(this.buffer, this.position, b, off, toRead); - this.position += toRead; - - return toRead; - } - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); -}
