http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java deleted file mode 100644 index 5811c91..0000000 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataOutputSerializer.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.client.state.serialization; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemoryUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Arrays; - -/** - * A simple and efficient serializer for the {@link java.io.DataOutput} interface. - * - * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b> - */ -public class DataOutputSerializer implements DataOutputView { - - private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class); - - private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024; - - // ------------------------------------------------------------------------ - - private final byte[] startBuffer; - - private byte[] buffer; - - private int position; - - private ByteBuffer wrapper; - - // ------------------------------------------------------------------------ - - public DataOutputSerializer(int startSize) { - if (startSize < 1) { - throw new IllegalArgumentException(); - } - - this.startBuffer = new byte[startSize]; - this.buffer = this.startBuffer; - this.wrapper = ByteBuffer.wrap(buffer); - } - - public ByteBuffer wrapAsByteBuffer() { - this.wrapper.position(0); - this.wrapper.limit(this.position); - return this.wrapper; - } - - public byte[] getByteArray() { - return buffer; - } - - public byte[] getCopyOfBuffer() { - return Arrays.copyOf(buffer, position); - } - - public void clear() { - this.position = 0; - } - - public int length() { - return this.position; - } - - public void pruneBuffer() { - if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) { - if (LOG.isDebugEnabled()) { - LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes."); - } - - this.buffer = this.startBuffer; - this.wrapper = ByteBuffer.wrap(this.buffer); - } - } - - @Override - public String toString() { - return String.format("[pos=%d cap=%d]", this.position, this.buffer.length); - } - - // ---------------------------------------------------------------------------------------- - // Data Output - // ---------------------------------------------------------------------------------------- - - @Override - public void write(int b) throws IOException { - if (this.position >= this.buffer.length) { - resize(1); - } - this.buffer[this.position++] = (byte) (b & 0xff); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - if (this.position > this.buffer.length - len) { - resize(len); - } - System.arraycopy(b, off, this.buffer, this.position, len); - this.position += len; - } - - @Override - public void writeBoolean(boolean v) throws IOException { - write(v ? 1 : 0); - } - - @Override - public void writeByte(int v) throws IOException { - write(v); - } - - @Override - public void writeBytes(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - sLen) { - resize(sLen); - } - - for (int i = 0; i < sLen; i++) { - writeByte(s.charAt(i)); - } - this.position += sLen; - } - - @Override - public void writeChar(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) (v >> 8); - this.buffer[this.position++] = (byte) v; - } - - @Override - public void writeChars(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - 2 * sLen) { - resize(2 * sLen); - } - - for (int i = 0; i < sLen; i++) { - writeChar(s.charAt(i)); - } - } - - @Override - public void writeDouble(double v) throws IOException { - writeLong(Double.doubleToLongBits(v)); - } - - @Override - public void writeFloat(float v) throws IOException { - writeInt(Float.floatToIntBits(v)); - } - - @SuppressWarnings("restriction") - @Override - public void writeInt(int v) throws IOException { - if (this.position >= this.buffer.length - 3) { - resize(4); - } - if (LITTLE_ENDIAN) { - v = Integer.reverseBytes(v); - } - UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v); - this.position += 4; - } - - @SuppressWarnings("restriction") - @Override - public void writeLong(long v) throws IOException { - if (this.position >= this.buffer.length - 7) { - resize(8); - } - if (LITTLE_ENDIAN) { - v = Long.reverseBytes(v); - } - UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v); - this.position += 8; - } - - @Override - public void writeShort(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff); - this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff); - } - - @Override - public void writeUTF(String str) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("Encoded string is too long: " + utflen); - } - else if (this.position > this.buffer.length - utflen - 2) { - resize(utflen + 2); - } - - byte[] bytearr = this.buffer; - int count = this.position; - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } - } - - this.position = count; - } - - private void resize(int minCapacityAdd) throws IOException { - int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); - byte[] nb; - try { - nb = new byte[newLen]; - } - catch (NegativeArraySizeException e) { - throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); - } - catch (OutOfMemoryError e) { - // this was too large to allocate, try the smaller size (if possible) - if (newLen > this.buffer.length + minCapacityAdd) { - newLen = this.buffer.length + minCapacityAdd; - try { - nb = new byte[newLen]; - } - catch (OutOfMemoryError ee) { - // still not possible. give an informative exception message that reports the size - throw new IOException("Failed to serialize element. Serialized size (> " - + newLen + " bytes) exceeds JVM heap space", ee); - } - } else { - throw new IOException("Failed to serialize element. Serialized size (> " - + newLen + " bytes) exceeds JVM heap space", e); - } - } - - System.arraycopy(this.buffer, 0, nb, 0, this.position); - this.buffer = nb; - this.wrapper = ByteBuffer.wrap(this.buffer); - } - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - if (buffer.length - this.position < numBytes){ - throw new EOFException("Could not skip " + numBytes + " bytes."); - } - - this.position += numBytes; - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - if (buffer.length - this.position < numBytes){ - throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); - } - - source.readFully(this.buffer, this.position, numBytes); - this.position += numBytes; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); -}
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java index 4c69483..4a64678 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java @@ -20,6 +20,8 @@ package org.apache.flink.queryablestate.client.state.serialization; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import java.io.IOException; import java.nio.ByteBuffer; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index 8f2c8fd..04a7a21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -22,8 +22,8 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import java.io.EOFException; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index aa8133f..742410e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -30,8 +30,8 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 6c541a9..87b9e4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -26,7 +26,7 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; /** * Record serializer which serializes the complete record to an intermediate http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 7c213b4..74260d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -23,7 +23,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.util.StringUtils; import java.io.BufferedInputStream; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java index 1791fe1..d821e0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java @@ -19,15 +19,15 @@ package org.apache.flink.runtime.metrics.dump; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.Metric; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java deleted file mode 100644 index 4e8871a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.MemoryUtils; - -/** - * A simple and efficient deserializer for the {@link java.io.DataInput} interface. - */ -public class DataInputDeserializer implements DataInputView, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - // ------------------------------------------------------------------------ - - private byte[] buffer; - - private int end; - - private int position; - - // ------------------------------------------------------------------------ - - public DataInputDeserializer() {} - - public DataInputDeserializer(byte[] buffer) { - setBuffer(buffer, 0, buffer.length); - } - - public DataInputDeserializer(byte[] buffer, int start, int len) { - setBuffer(buffer, start, len); - } - - public DataInputDeserializer(ByteBuffer buffer) { - setBuffer(buffer); - } - - // ------------------------------------------------------------------------ - // Changing buffers - // ------------------------------------------------------------------------ - - public void setBuffer(ByteBuffer buffer) { - if (buffer.hasArray()) { - this.buffer = buffer.array(); - this.position = buffer.arrayOffset() + buffer.position(); - this.end = this.position + buffer.remaining(); - } else if (buffer.isDirect()) { - this.buffer = new byte[buffer.remaining()]; - this.position = 0; - this.end = this.buffer.length; - - buffer.get(this.buffer); - } else { - throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer."); - } - } - - public void setBuffer(byte[] buffer, int start, int len) { - if (buffer == null) { - throw new NullPointerException(); - } - - if (start < 0 || len < 0 || start + len > buffer.length) { - throw new IllegalArgumentException(); - } - - this.buffer = buffer; - this.position = start; - this.end = start + len; - } - - public void releaseArrays() { - this.buffer = null; - } - - // ---------------------------------------------------------------------------------------- - // Data Input - // ---------------------------------------------------------------------------------------- - - public int available() { - if (position < end) { - return end - position; - } else { - return 0; - } - } - - @Override - public boolean readBoolean() throws IOException { - if (this.position < this.end) { - return this.buffer[this.position++] != 0; - } else { - throw new EOFException(); - } - } - - @Override - public byte readByte() throws IOException { - if (this.position < this.end) { - return this.buffer[this.position++]; - } else { - throw new EOFException(); - } - } - - @Override - public char readChar() throws IOException { - if (this.position < this.end - 1) { - return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff)); - } else { - throw new EOFException(); - } - } - - @Override - public double readDouble() throws IOException { - return Double.longBitsToDouble(readLong()); - } - - @Override - public float readFloat() throws IOException { - return Float.intBitsToFloat(readInt()); - } - - @Override - public void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - @Override - public void readFully(byte[] b, int off, int len) throws IOException { - if (len >= 0) { - if (off <= b.length - len) { - if (this.position <= this.end - len) { - System.arraycopy(this.buffer, position, b, off, len); - position += len; - } else { - throw new EOFException(); - } - } else { - throw new ArrayIndexOutOfBoundsException(); - } - } else if (len < 0) { - throw new IllegalArgumentException("Length may not be negative."); - } - } - - @Override - public int readInt() throws IOException { - if (this.position >= 0 && this.position < this.end - 3) { - @SuppressWarnings("restriction") - int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position); - if (LITTLE_ENDIAN) { - value = Integer.reverseBytes(value); - } - - this.position += 4; - return value; - } else { - throw new EOFException(); - } - } - - @Override - public String readLine() throws IOException { - if (this.position < this.end) { - // read until a newline is found - StringBuilder bld = new StringBuilder(); - char curr = (char) readUnsignedByte(); - while (position < this.end && curr != '\n') { - bld.append(curr); - curr = (char) readUnsignedByte(); - } - // trim a trailing carriage return - int len = bld.length(); - if (len > 0 && bld.charAt(len - 1) == '\r') { - bld.setLength(len - 1); - } - String s = bld.toString(); - bld.setLength(0); - return s; - } else { - return null; - } - } - - @Override - public long readLong() throws IOException { - if (position >= 0 && position < this.end - 7) { - @SuppressWarnings("restriction") - long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position); - if (LITTLE_ENDIAN) { - value = Long.reverseBytes(value); - } - this.position += 8; - return value; - } else { - throw new EOFException(); - } - } - - @Override - public short readShort() throws IOException { - if (position >= 0 && position < this.end - 1) { - return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff)); - } else { - throw new EOFException(); - } - } - - @Override - public String readUTF() throws IOException { - int utflen = readUnsignedShort(); - byte[] bytearr = new byte[utflen]; - char[] chararr = new char[utflen]; - - int c, char2, char3; - int count = 0; - int chararr_count = 0; - - readFully(bytearr, 0, utflen); - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - if (c > 127) { - break; - } - count++; - chararr[chararr_count++] = (char) c; - } - - while (count < utflen) { - c = (int) bytearr[count] & 0xff; - switch (c >> 4) { - case 0: - case 1: - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - /* 0xxxxxxx */ - count++; - chararr[chararr_count++] = (char) c; - break; - case 12: - case 13: - /* 110x xxxx 10xx xxxx */ - count += 2; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 1]; - if ((char2 & 0xC0) != 0x80) { - throw new UTFDataFormatException("malformed input around byte " + count); - } - chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); - break; - case 14: - /* 1110 xxxx 10xx xxxx 10xx xxxx */ - count += 3; - if (count > utflen) { - throw new UTFDataFormatException("malformed input: partial character at end"); - } - char2 = (int) bytearr[count - 2]; - char3 = (int) bytearr[count - 1]; - if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { - throw new UTFDataFormatException("malformed input around byte " + (count - 1)); - } - chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); - break; - default: - /* 10xx xxxx, 1111 xxxx */ - throw new UTFDataFormatException("malformed input around byte " + count); - } - } - // The number of chars produced may be less than utflen - return new String(chararr, 0, chararr_count); - } - - @Override - public int readUnsignedByte() throws IOException { - if (this.position < this.end) { - return (this.buffer[this.position++] & 0xff); - } else { - throw new EOFException(); - } - } - - @Override - public int readUnsignedShort() throws IOException { - if (this.position < this.end - 1) { - return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff); - } else { - throw new EOFException(); - } - } - - @Override - public int skipBytes(int n) throws IOException { - if (this.position <= this.end - n) { - this.position += n; - return n; - } else { - n = this.end - this.position; - this.position = this.end; - return n; - } - } - - @Override - public void skipBytesToRead(int numBytes) throws IOException { - int skippedBytes = skipBytes(numBytes); - - if (skippedBytes < numBytes){ - throw new EOFException("Could not skip " + numBytes +" bytes."); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null){ - throw new NullPointerException("Byte array b cannot be null."); - } - - if (off < 0){ - throw new IndexOutOfBoundsException("Offset cannot be negative."); - } - - if (len < 0){ - throw new IndexOutOfBoundsException("Length cannot be negative."); - } - - if (b.length - off < len){ - throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" + - "."); - } - - if (this.position >= this.end) { - return -1; - } else { - int toRead = Math.min(this.end-this.position, len); - System.arraycopy(this.buffer,this.position,b,off,toRead); - this.position += toRead; - - return toRead; - } - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java deleted file mode 100644 index 4f1cf77..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemoryUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Arrays; - -/** - * A simple and efficient serializer for the {@link java.io.DataOutput} interface. - */ -public class DataOutputSerializer implements DataOutputView { - - private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class); - - private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024; - - // ------------------------------------------------------------------------ - - private final byte[] startBuffer; - - private byte[] buffer; - - private int position; - - private ByteBuffer wrapper; - - // ------------------------------------------------------------------------ - - public DataOutputSerializer(int startSize) { - if (startSize < 1) { - throw new IllegalArgumentException(); - } - - this.startBuffer = new byte[startSize]; - this.buffer = this.startBuffer; - this.wrapper = ByteBuffer.wrap(buffer); - } - - public ByteBuffer wrapAsByteBuffer() { - this.wrapper.position(0); - this.wrapper.limit(this.position); - return this.wrapper; - } - - public byte[] getByteArray() { - return buffer; - } - - public byte[] getCopyOfBuffer() { - return Arrays.copyOf(buffer, position); - } - - public void clear() { - this.position = 0; - } - - public int length() { - return this.position; - } - - public void pruneBuffer() { - if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) { - if (LOG.isDebugEnabled()) { - LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes."); - } - - this.buffer = this.startBuffer; - this.wrapper = ByteBuffer.wrap(this.buffer); - } - } - - @Override - public String toString() { - return String.format("[pos=%d cap=%d]", this.position, this.buffer.length); - } - - // ---------------------------------------------------------------------------------------- - // Data Output - // ---------------------------------------------------------------------------------------- - - @Override - public void write(int b) throws IOException { - if (this.position >= this.buffer.length) { - resize(1); - } - this.buffer[this.position++] = (byte) (b & 0xff); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if (len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - if (this.position > this.buffer.length - len) { - resize(len); - } - System.arraycopy(b, off, this.buffer, this.position, len); - this.position += len; - } - - @Override - public void writeBoolean(boolean v) throws IOException { - write(v ? 1 : 0); - } - - @Override - public void writeByte(int v) throws IOException { - write(v); - } - - @Override - public void writeBytes(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - sLen) { - resize(sLen); - } - - for (int i = 0; i < sLen; i++) { - writeByte(s.charAt(i)); - } - this.position += sLen; - } - - @Override - public void writeChar(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) (v >> 8); - this.buffer[this.position++] = (byte) v; - } - - @Override - public void writeChars(String s) throws IOException { - final int sLen = s.length(); - if (this.position >= this.buffer.length - 2*sLen) { - resize(2*sLen); - } - for (int i = 0; i < sLen; i++) { - writeChar(s.charAt(i)); - } - } - - @Override - public void writeDouble(double v) throws IOException { - writeLong(Double.doubleToLongBits(v)); - } - - @Override - public void writeFloat(float v) throws IOException { - writeInt(Float.floatToIntBits(v)); - } - - @SuppressWarnings("restriction") - @Override - public void writeInt(int v) throws IOException { - if (this.position >= this.buffer.length - 3) { - resize(4); - } - if (LITTLE_ENDIAN) { - v = Integer.reverseBytes(v); - } - UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v); - this.position += 4; - } - - @SuppressWarnings("restriction") - @Override - public void writeLong(long v) throws IOException { - if (this.position >= this.buffer.length - 7) { - resize(8); - } - if (LITTLE_ENDIAN) { - v = Long.reverseBytes(v); - } - UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v); - this.position += 8; - } - - @Override - public void writeShort(int v) throws IOException { - if (this.position >= this.buffer.length - 1) { - resize(2); - } - this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff); - this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff); - } - - @Override - public void writeUTF(String str) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("Encoded string is too long: " + utflen); - } - else if (this.position > this.buffer.length - utflen - 2) { - resize(utflen + 2); - } - - byte[] bytearr = this.buffer; - int count = this.position; - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F)); - } - } - - this.position = count; - } - - - private void resize(int minCapacityAdd) throws IOException { - int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); - byte[] nb; - try { - nb = new byte[newLen]; - } - catch (NegativeArraySizeException e) { - throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); - } - catch (OutOfMemoryError e) { - // this was too large to allocate, try the smaller size (if possible) - if (newLen > this.buffer.length + minCapacityAdd) { - newLen = this.buffer.length + minCapacityAdd; - try { - nb = new byte[newLen]; - } - catch (OutOfMemoryError ee) { - // still not possible. give an informative exception message that reports the size - throw new IOException("Failed to serialize element. Serialized size (> " - + newLen + " bytes) exceeds JVM heap space", ee); - } - } else { - throw new IOException("Failed to serialize element. Serialized size (> " - + newLen + " bytes) exceeds JVM heap space", e); - } - } - - System.arraycopy(this.buffer, 0, nb, 0, this.position); - this.buffer = nb; - this.wrapper = ByteBuffer.wrap(this.buffer); - } - - @Override - public void skipBytesToWrite(int numBytes) throws IOException { - if(buffer.length - this.position < numBytes){ - throw new EOFException("Could not skip " + numBytes + " bytes."); - } - - this.position += numBytes; - } - - @Override - public void write(DataInputView source, int numBytes) throws IOException { - if(buffer.length - this.position < numBytes){ - throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); - } - - source.readFully(this.buffer, this.position, numBytes); - this.position += numBytes; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; - - @SuppressWarnings("restriction") - private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); - - private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN); -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java index c8bffb5..7e652ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.io.network.api; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java index 8455402..aad7ee1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory; -import org.apache.flink.runtime.io.network.api.serialization.types.Util; +import org.apache.flink.testutils.serialization.types.SerializationTestType; +import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; +import org.apache.flink.testutils.serialization.types.Util; import org.apache.flink.runtime.memory.AbstractPagedInputView; import org.apache.flink.runtime.memory.AbstractPagedOutputView; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index 9d0ee67..f988c55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory; -import org.apache.flink.runtime.io.network.api.serialization.types.Util; +import org.apache.flink.testutils.serialization.types.SerializationTestType; +import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; +import org.apache.flink.testutils.serialization.types.Util; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java index b7bcb3e..fe9a386 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java @@ -22,9 +22,9 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory; -import org.apache.flink.runtime.io.network.api.serialization.types.Util; +import org.apache.flink.testutils.serialization.types.SerializationTestType; +import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; +import org.apache.flink.testutils.serialization.types.Util; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java deleted file mode 100644 index 74de096..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/AsciiStringType.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; -import java.util.Random; - -public class AsciiStringType implements SerializationTestType { - - private static final int MAX_LEN = 1500; - - public String value; - - public AsciiStringType() { - this.value = ""; - } - - private AsciiStringType(String value) { - this.value = value; - } - - @Override - public AsciiStringType getRandom(Random rnd) { - final StringBuilder bld = new StringBuilder(); - final int len = rnd.nextInt(MAX_LEN + 1); - - for (int i = 0; i < len; i++) { - // 1--127 - bld.append((char) (rnd.nextInt(126) + 1)); - } - - return new AsciiStringType(bld.toString()); - } - - @Override - public int length() { - return value.getBytes(ConfigConstants.DEFAULT_CHARSET).length + 2; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeUTF(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readUTF(); - } - - @Override - public int hashCode() { - return this.value.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof AsciiStringType) { - AsciiStringType other = (AsciiStringType) obj; - return this.value.equals(other.value); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java deleted file mode 100644 index 66b099d..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/BooleanType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class BooleanType implements SerializationTestType { - - private boolean value; - - public BooleanType() { - this.value = false; - } - - private BooleanType(boolean value) { - this.value = value; - } - - @Override - public BooleanType getRandom(Random rnd) { - return new BooleanType(rnd.nextBoolean()); - } - - @Override - public int length() { - return 1; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeBoolean(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readBoolean(); - } - - @Override - public int hashCode() { - return this.value ? 1 : 0; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof BooleanType) { - BooleanType other = (BooleanType) obj; - return this.value == other.value; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java deleted file mode 100644 index 66fa22c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteArrayType.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class ByteArrayType implements SerializationTestType { - - private static final int MAX_LEN = 512 * 15; - - private byte[] data; - - public ByteArrayType() { - this.data = new byte[0]; - } - - public ByteArrayType(byte[] data) { - this.data = data; - } - - @Override - public ByteArrayType getRandom(Random rnd) { - final int len = rnd.nextInt(MAX_LEN) + 1; - final byte[] data = new byte[len]; - rnd.nextBytes(data); - return new ByteArrayType(data); - } - - @Override - public int length() { - return data.length + 4; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.data.length); - out.write(this.data); - } - - @Override - public void read(DataInputView in) throws IOException { - final int len = in.readInt(); - this.data = new byte[len]; - in.readFully(this.data); - } - - @Override - public int hashCode() { - return Arrays.hashCode(this.data); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ByteArrayType) { - ByteArrayType other = (ByteArrayType) obj; - return Arrays.equals(this.data, other.data); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java deleted file mode 100644 index 6431f14..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteSubArrayType.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class ByteSubArrayType implements SerializationTestType { - - private static final int MAX_LEN = 512; - - private final byte[] data; - - private int len; - - public ByteSubArrayType() { - this.data = new byte[MAX_LEN]; - this.len = 0; - } - - @Override - public ByteSubArrayType getRandom(Random rnd) { - final int len = rnd.nextInt(MAX_LEN) + 1; - final ByteSubArrayType t = new ByteSubArrayType(); - t.len = len; - - final byte[] data = t.data; - for (int i = 0; i < len; i++) { - data[i] = (byte) rnd.nextInt(256); - } - - return t; - } - - @Override - public int length() { - return len + 4; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.len); - out.write(this.data, 0, this.len); - } - - @Override - public void read(DataInputView in) throws IOException { - this.len = in.readInt(); - in.readFully(this.data, 0, this.len); - } - - @Override - public int hashCode() { - final byte[] copy = new byte[this.len]; - System.arraycopy(this.data, 0, copy, 0, this.len); - return Arrays.hashCode(copy); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ByteSubArrayType) { - ByteSubArrayType other = (ByteSubArrayType) obj; - if (this.len == other.len) { - for (int i = 0; i < this.len; i++) { - if (this.data[i] != other.data[i]) { - return false; - } - } - return true; - } else { - return false; - } - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java deleted file mode 100644 index 87fd7c0..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ByteType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class ByteType implements SerializationTestType { - - private byte value; - - public ByteType() { - this.value = (byte) 0; - } - - private ByteType(byte value) { - this.value = value; - } - - @Override - public ByteType getRandom(Random rnd) { - return new ByteType((byte) rnd.nextInt(256)); - } - - @Override - public int length() { - return 1; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeByte(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readByte(); - } - - @Override - public int hashCode() { - return this.value; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ByteType) { - ByteType other = (ByteType) obj; - return this.value == other.value; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java deleted file mode 100644 index b162ea0..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/CharType.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class CharType implements SerializationTestType { - - private char value; - - public CharType() { - this.value = 0; - } - - private CharType(char value) { - this.value = value; - } - - @Override - public CharType getRandom(Random rnd) { - return new CharType((char) rnd.nextInt(10000)); - } - - @Override - public int length() { - return 2; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeChar(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readChar(); - } - - @Override - public int hashCode() { - return this.value; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof CharType) { - CharType other = (CharType) obj; - return this.value == other.value; - } else { - return false; - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java deleted file mode 100644 index 654b685..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/DoubleType.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class DoubleType implements SerializationTestType { - - private double value; - - public DoubleType() { - this.value = 0; - } - - private DoubleType(double value) { - this.value = value; - } - - @Override - public DoubleType getRandom(Random rnd) { - return new DoubleType(rnd.nextDouble()); - } - - @Override - public int length() { - return 8; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeDouble(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readDouble(); - } - - @Override - public int hashCode() { - final long l = Double.doubleToLongBits(this.value); - return (int) (l ^ l >>> 32); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof DoubleType) { - DoubleType other = (DoubleType) obj; - return Double.doubleToLongBits(this.value) == Double.doubleToLongBits(other.value); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java deleted file mode 100644 index 653be45..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/FloatType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class FloatType implements SerializationTestType { - - private float value; - - public FloatType() { - this.value = 0; - } - - private FloatType(float value) { - this.value = value; - } - - @Override - public FloatType getRandom(Random rnd) { - return new FloatType(rnd.nextFloat()); - } - - @Override - public int length() { - return 4; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeFloat(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readFloat(); - } - - @Override - public int hashCode() { - return Float.floatToIntBits(this.value); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof FloatType) { - FloatType other = (FloatType) obj; - return Float.floatToIntBits(this.value) == Float.floatToIntBits(other.value); - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java deleted file mode 100644 index 4c6429d..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/IntType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class IntType implements SerializationTestType { - - private int value; - - public IntType() { - this.value = 0; - } - - public IntType(int value) { - this.value = value; - } - - @Override - public IntType getRandom(Random rnd) { - return new IntType(rnd.nextInt()); - } - - @Override - public int length() { - return 4; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readInt(); - } - - @Override - public int hashCode() { - return this.value; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof IntType) { - IntType other = (IntType) obj; - return this.value == other.value; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java deleted file mode 100644 index 934dfb7..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/LongType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class LongType implements SerializationTestType { - - private long value; - - public LongType() { - this.value = 0; - } - - private LongType(long value) { - this.value = value; - } - - @Override - public LongType getRandom(Random rnd) { - return new LongType(rnd.nextLong()); - } - - @Override - public int length() { - return 8; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeLong(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readLong(); - } - - @Override - public int hashCode() { - return (int) (this.value ^ this.value >>> 32); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof LongType) { - LongType other = (LongType) obj; - return this.value == other.value; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java deleted file mode 100644 index 69db122..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestType.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.util.Random; - -import org.apache.flink.core.io.IOReadableWritable; - -public interface SerializationTestType extends IOReadableWritable { - - public SerializationTestType getRandom(Random rnd); - - public int length(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java deleted file mode 100644 index 03a093f..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -public enum SerializationTestTypeFactory { - - BOOLEAN(new BooleanType()), - BYTE_ARRAY(new ByteArrayType()), - BYTE_SUB_ARRAY(new ByteSubArrayType()), - BYTE(new ByteType()), - CHAR(new CharType()), - DOUBLE(new DoubleType()), - FLOAT(new FloatType()), - INT(new IntType()), - LONG(new LongType()), - SHORT(new ShortType()), - UNSIGNED_BYTE(new UnsignedByteType()), - UNSIGNED_SHORT(new UnsignedShortType()), - STRING(new AsciiStringType()); - - private final SerializationTestType factory; - - SerializationTestTypeFactory(SerializationTestType type) { - this.factory = type; - } - - public SerializationTestType factory() { - return this.factory; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java deleted file mode 100644 index 69e0ffc..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/ShortType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class ShortType implements SerializationTestType { - - private short value; - - public ShortType() { - this.value = (short) 0; - } - - private ShortType(short value) { - this.value = value; - } - - @Override - public ShortType getRandom(Random rnd) { - return new ShortType((short) rnd.nextInt(65536)); - } - - @Override - public int length() { - return 2; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeShort(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readShort(); - } - - @Override - public int hashCode() { - return this.value; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof ShortType) { - ShortType other = (ShortType) obj; - return this.value == other.value; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java deleted file mode 100644 index eabb1dd..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedByteType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class UnsignedByteType implements SerializationTestType { - - private int value; - - public UnsignedByteType() { - this.value = 0; - } - - private UnsignedByteType(int value) { - this.value = value; - } - - @Override - public UnsignedByteType getRandom(Random rnd) { - return new UnsignedByteType(rnd.nextInt(128) + 128); - } - - @Override - public int length() { - return 1; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeByte(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readUnsignedByte(); - } - - @Override - public int hashCode() { - return this.value; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof UnsignedByteType) { - UnsignedByteType other = (UnsignedByteType) obj; - return this.value == other.value; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java deleted file mode 100644 index 6242900..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/UnsignedShortType.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.io.IOException; -import java.util.Random; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -public class UnsignedShortType implements SerializationTestType { - - private int value; - - public UnsignedShortType() { - this.value = 0; - } - - private UnsignedShortType(int value) { - this.value = value; - } - - @Override - public UnsignedShortType getRandom(Random rnd) { - return new UnsignedShortType(rnd.nextInt(32768) + 32768); - } - - @Override - public int length() { - return 2; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeShort(this.value); - } - - @Override - public void read(DataInputView in) throws IOException { - this.value = in.readUnsignedShort(); - } - - @Override - public int hashCode() { - return this.value; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof UnsignedShortType) { - UnsignedShortType other = (UnsignedShortType) obj; - return this.value == other.value; - } else { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java deleted file mode 100644 index 110f7ce..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/Util.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.serialization.types; - -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Random; - -/** - * Utility class to help serialization for testing. - */ -public final class Util { - - private static final long SEED = 64871654635745873L; - - private static Random random = new Random(SEED); - - public static SerializationTestType randomRecord(SerializationTestTypeFactory type) { - return type.factory().getRandom(Util.random); - } - - public static MockRecords randomRecords(final int numElements, final SerializationTestTypeFactory type) { - - return new MockRecords(numElements) { - @Override - protected SerializationTestType getRecord() { - return type.factory().getRandom(Util.random); - } - }; - } - - public static MockRecords randomRecords(final int numElements) { - - return new MockRecords(numElements) { - @Override - protected SerializationTestType getRecord() { - // select random test type factory - SerializationTestTypeFactory[] types = SerializationTestTypeFactory.values(); - int i = Util.random.nextInt(types.length); - - return types[i].factory().getRandom(Util.random); - } - }; - } - - // ----------------------------------------------------------------------------------------------------------------- - public abstract static class MockRecords implements Iterable<SerializationTestType> { - - private int numRecords; - - public MockRecords(int numRecords) { - this.numRecords = numRecords; - } - - @Override - public Iterator<SerializationTestType> iterator() { - return new Iterator<SerializationTestType>() { - @Override - public boolean hasNext() { - return numRecords > 0; - } - - @Override - public SerializationTestType next() { - if (numRecords > 0) { - numRecords--; - - return getRecord(); - } - - throw new NoSuchElementException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - abstract protected SerializationTestType getRecord(); - } - - /** - * No instantiation. - */ - private Util() { - throw new RuntimeException(); - } -}
