Fix conflicts after rebase of PR #249 This closes #249.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7df6a3d7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7df6a3d7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7df6a3d7 Branch: refs/heads/master Commit: 7df6a3d7266b0f934b76722732176dbf5469bdb4 Parents: 5970e21 Author: Ufuk Celebi <[email protected]> Authored: Tue Jan 20 17:26:02 2015 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 21 12:01:59 2015 +0100 ---------------------------------------------------------------------- flink-runtime/pom.xml | 6 +-- .../iomanager/AsynchronousFileIOChannel.java | 37 +++++++++++++--- .../runtime/io/disk/iomanager/IOManager.java | 1 + .../api/reader/AbstractRecordReader.java | 10 ++--- ...llingAdaptiveSpanningRecordDeserializer.java | 38 +++++++++++++---- .../sort/CombiningUnilateralSortMerger.java | 25 ++++++----- .../operators/sort/LargeRecordHandler.java | 5 +++ .../runtime/operators/sort/MergeIterator.java | 8 ++-- .../runtime/util/DataOutputSerializer.java | 14 +++--- .../AsynchronousFileIOChannelsTest.java | 16 ++++--- .../io/disk/iomanager/IOManagerTest.java | 14 +++--- .../SpanningRecordSerializationTest.java | 26 ++++++----- .../SpanningRecordSerializerTest.java | 45 ++++++++++---------- .../network/serialization/LargeRecordsTest.java | 31 +++++++++----- .../serialization/types/LargeObjectType.java | 1 + .../sort/ExternalSortLargeRecordsITCase.java | 44 +++++++++++++------ 16 files changed, 202 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 5ac12d5..59ec7f0 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -40,7 +40,7 @@ under the License. <artifactId>flink-core</artifactId> <version>${project.version}</version> </dependency> - + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> @@ -81,13 +81,13 @@ under the License. <artifactId>netty-all</artifactId> <version>4.0.24.Final</version> </dependency> - + <dependency> <groupId>org.codehaus.jettison</groupId> <artifactId>jettison</artifactId> <version>1.1</version> </dependency> - + <!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading --> <dependency> <groupId>com.google.guava</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java index 9a9ee61..281eaad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java @@ -18,13 +18,13 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.flink.core.memory.MemorySegment; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.flink.core.memory.MemorySegment; - import static com.google.common.base.Preconditions.checkNotNull; /** @@ -72,7 +72,7 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends * @throws IOException Thrown, if the channel could no be opened. */ protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue, - RequestDoneCallback callback, boolean writeEnabled) throws IOException + RequestDoneCallback<T> callback, boolean writeEnabled) throws IOException { super(channelID, writeEnabled); @@ -190,9 +190,6 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends this.closeLock.notifyAll(); } } - else { - this.requestsNotReturned.decrementAndGet(); - } } } @@ -279,3 +276,31 @@ final class SegmentWriteRequest implements WriteRequest { this.channel.handleProcessedBuffer(this.segment, ioex); } } + +/** + * Request that seeks the underlying file channel to the given position. + */ +final class SeekRequest implements ReadRequest, WriteRequest { + + private final AsynchronousFileIOChannel<?, ?> channel; + private final long position; + + protected SeekRequest(AsynchronousFileIOChannel<?, ?> channel, long position) { + this.channel = channel; + this.position = position; + } + + @Override + public void requestDone(IOException ioex) { + } + + @Override + public void read() throws IOException { + this.channel.fileChannel.position(position); + } + + @Override + public void write() throws IOException { + this.channel.fileChannel.position(position); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index e58c4d8..6cf19f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Random; http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index 15b8dcc..2ee3256 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -18,16 +18,16 @@ package org.apache.flink.runtime.io.network.api.reader; -import java.io.IOException; - import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.event.task.TaskEvent; -import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.util.event.EventListener; +import java.io.IOException; + /** * A record-oriented runtime result reader, which wraps a {@link BufferReaderBase}. * <p> @@ -50,9 +50,9 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> implements Rea this.reader = reader; // Initialize one deserializer per input channel - this.recordDeserializers = new AdaptiveSpanningRecordDeserializer[reader.getNumberOfInputChannels()]; + this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[reader.getNumberOfInputChannels()]; for (int i = 0; i < recordDeserializers.length; i++) { - recordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T>(); + recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 371ba0a..5de42b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -18,6 +18,17 @@ package org.apache.flink.runtime.io.network.serialization; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.util.StringUtils; + import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.EOFException; @@ -31,14 +42,6 @@ import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.util.Random; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.util.StringUtils; - /** * @param <T> The type of the record to be deserialized. */ @@ -50,6 +53,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit private final SpanningWrapper spanningWrapper; + private Buffer currentBuffer; + public SpillingAdaptiveSpanningRecordDeserializer() { String tempDirString = GlobalConfiguration.getString( @@ -60,6 +65,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit this.nonSpanningWrapper = new NonSpanningWrapper(); this.spanningWrapper = new SpanningWrapper(directories); } + + @Override + public void setNextBuffer(Buffer buffer) throws IOException { + currentBuffer = buffer; + + MemorySegment segment = buffer.getMemorySegment(); + int numBytes = buffer.getSize(); + + setNextMemorySegment(segment, numBytes); + } + + @Override + public Buffer getCurrentBuffer () { + Buffer tmp = currentBuffer; + currentBuffer = null; + return tmp; + } @Override public void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index 35297ca..3466024 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -18,16 +18,6 @@ package org.apache.flink.runtime.operators.sort; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Queue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -36,18 +26,27 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; -import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memorymanager.MemoryAllocationException; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.util.EmptyMutableObjectIterator; -import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; import org.apache.flink.util.TraversableOnceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; /** * The CombiningUnilateralSortMerger is part of a merge-sort implementation. @@ -417,7 +416,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // get the readers and register them to be released final MergeIterator<E> mergeIterator = getMergingIterator( - channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size())); + channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()), null); // set the target for the user iterator // if the final merge combines, create a combining iterator around the merge iterator, http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java index 83e003f..e1be59a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java @@ -447,5 +447,10 @@ public class LargeRecordHandler<T> { return null; } } + + @Override + public T next() throws IOException { + return next(serializer.createInstance()); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java index 759e0e9..9da429d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java @@ -18,14 +18,14 @@ package org.apache.flink.runtime.operators.sort; -import java.io.IOException; -import java.util.Comparator; -import java.util.List; - import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.util.MutableObjectIterator; +import java.io.IOException; +import java.util.Comparator; +import java.util.List; + /** * An iterator that returns a sorted merge of the sequences of elements from a * set of iterators, assuming those sequences are ordered themselves. http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java index b7a3715..2d06e29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java @@ -16,13 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.serialization; - -import java.io.EOFException; -import java.io.IOException; -import java.io.UTFDataFormatException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; +package org.apache.flink.runtime.util; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -30,6 +24,12 @@ import org.apache.flink.core.memory.MemoryUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.EOFException; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + /** * A simple and efficient serializer for the {@link java.io.DataOutput} interface. */ http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java index 1e9d4d4..94409a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java @@ -18,14 +18,16 @@ package org.apache.flink.runtime.io.disk.iomanager; -import static org.junit.Assert.*; +import org.apache.flink.core.memory.MemorySegment; +import org.junit.Test; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.flink.core.memory.MemorySegment; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; public class AsynchronousFileIOChannelsTest { @@ -40,7 +42,7 @@ public class AsynchronousFileIOChannelsTest { final AtomicInteger callbackCounter = new AtomicInteger(); final AtomicBoolean exceptionOccurred = new AtomicBoolean(); - final RequestDoneCallback callback = new RequestDoneCallback() { + final RequestDoneCallback<MemorySegment> callback = new RequestDoneCallback<MemorySegment>() { @Override public void requestSuccessful(MemorySegment buffer) { @@ -142,7 +144,7 @@ public class AsynchronousFileIOChannelsTest { } } - private static class NoOpCallback implements RequestDoneCallback { + private static class NoOpCallback implements RequestDoneCallback<MemorySegment> { @Override public void requestSuccessful(MemorySegment buffer) {} @@ -153,11 +155,11 @@ public class AsynchronousFileIOChannelsTest { private static class FailingWriteRequest implements WriteRequest { - private final AsynchronousFileIOChannel<WriteRequest> channel; + private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel; private final MemorySegment segment; - protected FailingWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) { + protected FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) { this.channel = targetChannel; this.segment = segment; } http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java index ab5c206..4be667a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java @@ -18,17 +18,17 @@ package org.apache.flink.runtime.io.disk.iomanager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; +import org.junit.Test; import java.io.File; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class IOManagerTest { @@ -94,7 +94,7 @@ public class IOManagerTest { } @Override - public BlockChannelWriterWithCallback createBlockChannelWriter(ID channelID, RequestDoneCallback callback) { + public BlockChannelWriterWithCallback createBlockChannelWriter(ID channelID, RequestDoneCallback<MemorySegment> callback) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index 6ceb05a..be63fe5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -16,24 +16,22 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.serialization; - -import org.junit.Assert; +package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.Buffer; -import org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer; -import org.apache.flink.runtime.io.network.serialization.RecordDeserializer; -import org.apache.flink.runtime.io.network.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.serialization.RecordDeserializer.DeserializationResult; -import org.apache.flink.runtime.io.network.serialization.types.SerializationTestType; -import org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory; -import org.apache.flink.runtime.io.network.serialization.types.Util; +import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; +import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory; +import org.apache.flink.runtime.io.network.api.serialization.types.Util; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.junit.Assert; import org.junit.Test; import java.util.ArrayDeque; +import static org.mockito.Mockito.mock; + public class SpanningRecordSerializationTest { @Test @@ -128,7 +126,7 @@ public class SpanningRecordSerializationTest { { final int SERIALIZATION_OVERHEAD = 4; // length encoding - final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), segmentSize, null); + final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), mock(BufferRecycler.class)); final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>(); @@ -181,7 +179,7 @@ public class SpanningRecordSerializationTest { SerializationTestType expected = serializedRecords.poll(); SerializationTestType actual = expected.getClass().newInstance(); - DeserializationResult result = deserializer.getNextRecord(actual); + RecordDeserializer.DeserializationResult result = deserializer.getNextRecord(actual); Assert.assertTrue(result.isFullRecord()); Assert.assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java index 920d683..50d3639 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java @@ -16,23 +16,24 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.serialization; - -import org.junit.Assert; +package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.Buffer; -import org.apache.flink.runtime.io.network.serialization.RecordSerializer.SerializationResult; -import org.apache.flink.runtime.io.network.serialization.types.SerializationTestType; -import org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory; -import org.apache.flink.runtime.io.network.serialization.types.Util; +import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; +import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory; +import org.apache.flink.runtime.io.network.api.serialization.types.Util; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.util.Random; +import static org.mockito.Mockito.mock; + public class SpanningRecordSerializerTest { @Test @@ -40,7 +41,7 @@ public class SpanningRecordSerializerTest { final int SEGMENT_SIZE = 16; final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null); + final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class)); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); Assert.assertFalse(serializer.hasData()); @@ -74,10 +75,10 @@ public class SpanningRecordSerializerTest { final int SEGMENT_SIZE = 11; final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null); + final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class)); try { - Assert.assertEquals(SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer)); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer)); } catch (IOException e) { e.printStackTrace(); } @@ -111,17 +112,17 @@ public class SpanningRecordSerializerTest { } }; - SerializationResult result = serializer.addRecord(emptyRecord); - Assert.assertEquals(SerializationResult.FULL_RECORD, result); + RecordSerializer.SerializationResult result = serializer.addRecord(emptyRecord); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); result = serializer.addRecord(emptyRecord); - Assert.assertEquals(SerializationResult.FULL_RECORD, result); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); result = serializer.addRecord(emptyRecord); - Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); + Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); result = serializer.setNextBuffer(buffer); - Assert.assertEquals(SerializationResult.FULL_RECORD, result); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } catch (Exception e) { e.printStackTrace(); @@ -189,7 +190,7 @@ public class SpanningRecordSerializerTest { /** * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected - * {@link SerializationResult} values. + * {@link RecordSerializer.SerializationResult} values. * <p> * Only a single {@link MemorySegment} will be allocated. * @@ -200,7 +201,7 @@ public class SpanningRecordSerializerTest { final int SERIALIZATION_OVERHEAD = 4; // length encoding final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), Mockito.mock(BufferRecycler.class)); + final Buffer buffer = new Buffer(new MemorySegment(new byte[segmentSize]), mock(BufferRecycler.class)); // ------------------------------------------------------------------------------------------------------------- @@ -208,17 +209,17 @@ public class SpanningRecordSerializerTest { int numBytes = 0; for (SerializationTestType record : records) { - SerializationResult result = serializer.addRecord(record); + RecordSerializer.SerializationResult result = serializer.addRecord(record); numBytes += record.length() + SERIALIZATION_OVERHEAD; if (numBytes < segmentSize) { - Assert.assertEquals(SerializationResult.FULL_RECORD, result); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } else if (numBytes == segmentSize) { - Assert.assertEquals(SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); + Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); serializer.setNextBuffer(buffer); numBytes = 0; } else { - Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); + Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); while (result.isFullBuffer()) { numBytes -= segmentSize; http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java index 6c1fd64..5ce145b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java @@ -18,18 +18,27 @@ package org.apache.flink.runtime.io.network.serialization; -import static org.junit.Assert.*; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; +import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; +import org.apache.flink.runtime.io.network.api.serialization.types.IntType; +import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType; +import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Random; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.Buffer; -import org.apache.flink.runtime.io.network.serialization.types.IntType; -import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType; -import org.apache.flink.runtime.io.network.serialization.types.SerializationTestType; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; public class LargeRecordsTest { @@ -42,7 +51,7 @@ public class LargeRecordsTest { final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>(); - final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null); + final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class)); List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>(); List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>(); @@ -108,7 +117,7 @@ public class LargeRecordsTest { // move the last (incomplete buffer) Buffer last = serializer.getCurrentBuffer(); - deserializer.setNextMemorySegment(last.getMemorySegment(), last.size()); + deserializer.setNextMemorySegment(last.getMemorySegment(), last.getSize()); serializer.clear(); // deserialize records, as many as there are in the last buffer @@ -139,7 +148,7 @@ public class LargeRecordsTest { final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(); - final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), SEGMENT_SIZE, null); + final Buffer buffer = new Buffer(new MemorySegment(new byte[SEGMENT_SIZE]), mock(BufferRecycler.class)); List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>(); List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>(); @@ -205,7 +214,7 @@ public class LargeRecordsTest { // move the last (incomplete buffer) Buffer last = serializer.getCurrentBuffer(); - deserializer.setNextMemorySegment(last.getMemorySegment(), last.size()); + deserializer.setNextMemorySegment(last.getMemorySegment(), last.getSize()); serializer.clear(); // deserialize records, as many as there are in the last buffer http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java index 01a00e4..21be6e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java @@ -24,6 +24,7 @@ import java.util.Random; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType; public class LargeObjectType implements SerializationTestType { http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index ad15282..38442c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -18,14 +18,6 @@ package org.apache.flink.runtime.operators.sort; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Random; - import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -48,6 +40,14 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class ExternalSortLargeRecordsITCase { @@ -116,7 +116,12 @@ public class ExternalSortLargeRecordsITCase { } } - }; + + @Override + public Tuple2<Long, SomeMaybeLongValue> next() throws IOException { + return next(new Tuple2<Long, SomeMaybeLongValue>()); + } + }; @SuppressWarnings("unchecked") Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( @@ -182,7 +187,12 @@ public class ExternalSortLargeRecordsITCase { } } - }; + + @Override + public Tuple2<Long, SomeMaybeLongValue> next() throws IOException { + return new Tuple2<Long, SomeMaybeLongValue>(); + } + }; @SuppressWarnings("unchecked") Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>( @@ -260,7 +270,12 @@ public class ExternalSortLargeRecordsITCase { } } - }; + + @Override + public Tuple2<Long, SmallOrMediumOrLargeValue> next() throws IOException { + return new Tuple2<Long, SmallOrMediumOrLargeValue>(); + } + }; @SuppressWarnings("unchecked") Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>( @@ -326,7 +341,12 @@ public class ExternalSortLargeRecordsITCase { } } - }; + + @Override + public Tuple2<Long, SmallOrMediumOrLargeValue> next() throws IOException { + return new Tuple2<Long, SmallOrMediumOrLargeValue>(); + } + }; @SuppressWarnings("unchecked") Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>(
