Repository: flink Updated Branches: refs/heads/master 198b74a85 -> 37df826e4
http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java index 25523db..d596863 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java @@ -24,8 +24,8 @@ import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRec import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.types.IntType; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; +import org.apache.flink.testutils.serialization.types.IntType; +import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java index 21be6e4..8efd2bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java @@ -24,7 +24,7 @@ import java.util.Random; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; +import org.apache.flink.testutils.serialization.types.SerializationTestType; public class LargeObjectType implements SerializationTestType { http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java deleted file mode 100644 index 2032d45..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; - -/** - * Test suite for the {@link DataInputDeserializer} class. - */ -public class DataInputDeserializerTest { - - @Test - public void testAvailable() throws Exception { - byte[] bytes; - DataInputDeserializer dis; - - bytes = new byte[] {}; - dis = new DataInputDeserializer(bytes, 0, bytes.length); - Assert.assertEquals(bytes.length, dis.available()); - - bytes = new byte[] {1, 2, 3}; - dis = new DataInputDeserializer(bytes, 0, bytes.length); - Assert.assertEquals(bytes.length, dis.available()); - - dis.readByte(); - Assert.assertEquals(2, dis.available()); - dis.readByte(); - Assert.assertEquals(1, dis.available()); - dis.readByte(); - Assert.assertEquals(0, dis.available()); - - try { - dis.readByte(); - Assert.fail("Did not throw expected IOException"); - } catch (IOException e) { - // ignore - } - Assert.assertEquals(0, dis.available()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java deleted file mode 100644 index 3631ba1..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputOutputSerializerTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.junit.Assert; - -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; -import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory; -import org.apache.flink.runtime.io.network.api.serialization.types.Util; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; - -public class DataInputOutputSerializerTest { - - @Test - public void testWrapAsByteBuffer() { - SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT); - - DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length()); - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(randomInt.length()); - - try { - // empty buffer, read buffer should be empty - ByteBuffer wrapper = serializer.wrapAsByteBuffer(); - - Assert.assertEquals(0, wrapper.position()); - Assert.assertEquals(0, wrapper.limit()); - - // write to data output, read buffer should still be empty - randomInt.write(serializer); - - Assert.assertEquals(0, wrapper.position()); - Assert.assertEquals(0, wrapper.limit()); - - // get updated read buffer, read buffer should contain written data - wrapper = serializer.wrapAsByteBuffer(); - - Assert.assertEquals(0, wrapper.position()); - Assert.assertEquals(randomInt.length(), wrapper.limit()); - - // clear data output, read buffer should still contain written data - serializer.clear(); - - Assert.assertEquals(0, wrapper.position()); - Assert.assertEquals(randomInt.length(), wrapper.limit()); - - // get updated read buffer, should be empty - wrapper = serializer.wrapAsByteBuffer(); - - Assert.assertEquals(0, wrapper.position()); - Assert.assertEquals(0, wrapper.limit()); - - // write to data output and read back to memory - randomInt.write(serializer); - wrapper = serializer.wrapAsByteBuffer(); - - segment.put(0, wrapper, randomInt.length()); - - Assert.assertEquals(randomInt.length(), wrapper.position()); - Assert.assertEquals(randomInt.length(), wrapper.limit()); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail("Test encountered an unexpected exception."); - } - } - - @Test - public void testRandomValuesWriteRead() { - final int numElements = 100000; - final ArrayDeque<SerializationTestType> reference = new ArrayDeque<SerializationTestType>(); - - DataOutputSerializer serializer = new DataOutputSerializer(1); - - for (SerializationTestType value : Util.randomRecords(numElements)) { - reference.add(value); - - try { - value.write(serializer); - } catch (IOException e) { - e.printStackTrace(); - Assert.fail("Test encountered an unexpected exception."); - } - } - - DataInputDeserializer deserializer = new DataInputDeserializer(serializer.wrapAsByteBuffer()); - - for (SerializationTestType expected : reference) { - try { - SerializationTestType actual = expected.getClass().newInstance(); - actual.read(deserializer); - - Assert.assertEquals(expected, actual); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test encountered an unexpected exception."); - } - } - - reference.clear(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java index 26cc5ad..1a8bc58 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java @@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import java.io.IOException; import java.util.ArrayDeque; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java index b187b04..1c50dc2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java @@ -22,8 +22,8 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/37df826e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java index 79b2b75..4a99317 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java @@ -21,8 +21,8 @@ package org.apache.flink.streaming.runtime.streamrecord; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.junit.Test;
