Fix conflicts after rebase of PR #249

This closes #249.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7df6a3d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7df6a3d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7df6a3d7

Branch: refs/heads/master
Commit: 7df6a3d7266b0f934b76722732176dbf5469bdb4
Parents: 5970e21
Author: Ufuk Celebi <[email protected]>
Authored: Tue Jan 20 17:26:02 2015 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Jan 21 12:01:59 2015 +0100

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  6 +--
 .../iomanager/AsynchronousFileIOChannel.java    | 37 +++++++++++++---
 .../runtime/io/disk/iomanager/IOManager.java    |  1 +
 .../api/reader/AbstractRecordReader.java        | 10 ++---
 ...llingAdaptiveSpanningRecordDeserializer.java | 38 +++++++++++++----
 .../sort/CombiningUnilateralSortMerger.java     | 25 ++++++-----
 .../operators/sort/LargeRecordHandler.java      |  5 +++
 .../runtime/operators/sort/MergeIterator.java   |  8 ++--
 .../runtime/util/DataOutputSerializer.java      | 14 +++---
 .../AsynchronousFileIOChannelsTest.java         | 16 ++++---
 .../io/disk/iomanager/IOManagerTest.java        | 14 +++---
 .../SpanningRecordSerializationTest.java        | 26 ++++++-----
 .../SpanningRecordSerializerTest.java           | 45 ++++++++++----------
 .../network/serialization/LargeRecordsTest.java | 31 +++++++++-----
 .../serialization/types/LargeObjectType.java    |  1 +
 .../sort/ExternalSortLargeRecordsITCase.java    | 44 +++++++++++++------
 16 files changed, 202 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 5ac12d5..59ec7f0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -40,7 +40,7 @@ under the License.
                        <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
                </dependency>
-               
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
@@ -81,13 +81,13 @@ under the License.
                        <artifactId>netty-all</artifactId>
                        <version>4.0.24.Final</version>
                </dependency>
-               
+
                <dependency>
                        <groupId>org.codehaus.jettison</groupId>
                        <artifactId>jettison</artifactId>
                        <version>1.1</version>
                </dependency>
-               
+
                <!--  guava needs to be in "provided" scope, to make sure it is 
not included into the jars by the shading -->
                <dependency>
                        <groupId>com.google.guava</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 9a9ee61..281eaad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegment;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.core.memory.MemorySegment;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -72,7 +72,7 @@ public abstract class AsynchronousFileIOChannel<T, R extends 
IORequest> extends
         * @throws IOException Thrown, if the channel could no be opened.
         */
        protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, 
RequestQueue<R> requestQueue, 
-                       RequestDoneCallback callback, boolean writeEnabled) 
throws IOException
+                       RequestDoneCallback<T> callback, boolean writeEnabled) 
throws IOException
        {
                super(channelID, writeEnabled);
 
@@ -190,9 +190,6 @@ public abstract class AsynchronousFileIOChannel<T, R 
extends IORequest> extends
                                        this.closeLock.notifyAll();
                                }
                        }
-                       else {
-                               this.requestsNotReturned.decrementAndGet();
-                       }
                }
        }
 
@@ -279,3 +276,31 @@ final class SegmentWriteRequest implements WriteRequest {
                this.channel.handleProcessedBuffer(this.segment, ioex);
        }
 }
+
+/**
+ * Request that seeks the underlying file channel to the given position.
+ */
+final class SeekRequest implements ReadRequest, WriteRequest {
+
+       private final AsynchronousFileIOChannel<?, ?> channel;
+       private final long position;
+
+       protected SeekRequest(AsynchronousFileIOChannel<?, ?> channel, long 
position) {
+               this.channel = channel;
+               this.position = position;
+       }
+
+       @Override
+       public void requestDone(IOException ioex) {
+       }
+
+       @Override
+       public void read() throws IOException {
+               this.channel.fileChannel.position(position);
+       }
+
+       @Override
+       public void write() throws IOException {
+               this.channel.fileChannel.position(position);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index e58c4d8..6cf19f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 15b8dcc..2ee3256 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.event.task.TaskEvent;
-import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.util.event.EventListener;
 
+import java.io.IOException;
+
 /**
  * A record-oriented runtime result reader, which wraps a {@link 
BufferReaderBase}.
  * <p>
@@ -50,9 +50,9 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> implements Rea
                this.reader = reader;
 
                // Initialize one deserializer per input channel
-               this.recordDeserializers = new 
AdaptiveSpanningRecordDeserializer[reader.getNumberOfInputChannels()];
+               this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[reader.getNumberOfInputChannels()];
                for (int i = 0; i < recordDeserializers.length; i++) {
-                       recordDeserializers[i] = new 
AdaptiveSpanningRecordDeserializer<T>();
+                       recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<T>();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 371ba0a..5de42b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -18,6 +18,17 @@
 
 package org.apache.flink.runtime.io.network.serialization;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.util.StringUtils;
+
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.EOFException;
@@ -31,14 +42,6 @@ import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.util.StringUtils;
-
 /**
  * @param <T> The type of the record to be deserialized.
  */
@@ -50,6 +53,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
        
        private final SpanningWrapper spanningWrapper;
 
+       private Buffer currentBuffer;
+
        public SpillingAdaptiveSpanningRecordDeserializer() {
                
                String tempDirString = GlobalConfiguration.getString(
@@ -60,6 +65,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                this.nonSpanningWrapper = new NonSpanningWrapper();
                this.spanningWrapper = new SpanningWrapper(directories);
        }
+
+       @Override
+       public void setNextBuffer(Buffer buffer) throws IOException {
+               currentBuffer = buffer;
+
+               MemorySegment segment = buffer.getMemorySegment();
+               int numBytes = buffer.getSize();
+
+               setNextMemorySegment(segment, numBytes);
+       }
+
+       @Override
+       public Buffer getCurrentBuffer () {
+               Buffer tmp = currentBuffer;
+               currentBuffer = null;
+               return tmp;
+       }
        
        @Override
        public void setNextMemorySegment(MemorySegment segment, int numBytes) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 35297ca..3466024 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -36,18 +26,27 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.util.EmptyMutableObjectIterator;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.TraversableOnceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
 
 /**
  * The CombiningUnilateralSortMerger is part of a merge-sort implementation. 
@@ -417,7 +416,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                                
                                // get the readers and register them to be 
released
                                final MergeIterator<E> mergeIterator = 
getMergingIterator(
-                                               channelIDs, readBuffers, new 
ArrayList<FileIOChannel>(channelIDs.size()));
+                                               channelIDs, readBuffers, new 
ArrayList<FileIOChannel>(channelIDs.size()), null);
                                
                                // set the target for the user iterator
                                // if the final merge combines, create a 
combining iterator around the merge iterator,

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
index 83e003f..e1be59a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
@@ -447,5 +447,10 @@ public class LargeRecordHandler<T> {
                                return null;
                        }
                }
+
+               @Override
+               public T next() throws IOException {
+                       return next(serializer.createInstance());
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index 759e0e9..9da429d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.IOException;
-import java.util.Comparator;
-import java.util.List;
-
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
 /**
  * An iterator that returns a sorted merge of the sequences of elements from a
  * set of iterators, assuming those sequences are ordered themselves.

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index b7a3715..2d06e29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -16,13 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.serialization;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
+package org.apache.flink.runtime.util;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -30,6 +24,12 @@ import org.apache.flink.core.memory.MemoryUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
 /**
  * A simple and efficient serializer for the {@link java.io.DataOutput} 
interface.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
index 1e9d4d4..94409a4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import static org.junit.Assert.*;
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class AsynchronousFileIOChannelsTest {
 
@@ -40,7 +42,7 @@ public class AsynchronousFileIOChannelsTest {
                        final AtomicInteger callbackCounter = new 
AtomicInteger();
                        final AtomicBoolean exceptionOccurred = new 
AtomicBoolean();
                        
-                       final RequestDoneCallback callback = new 
RequestDoneCallback() {
+                       final RequestDoneCallback<MemorySegment> callback = new 
RequestDoneCallback<MemorySegment>() {
                                
                                @Override
                                public void requestSuccessful(MemorySegment 
buffer) {
@@ -142,7 +144,7 @@ public class AsynchronousFileIOChannelsTest {
                }
        }
        
-       private static class NoOpCallback implements RequestDoneCallback {
+       private static class NoOpCallback implements 
RequestDoneCallback<MemorySegment> {
 
                @Override
                public void requestSuccessful(MemorySegment buffer) {}
@@ -153,11 +155,11 @@ public class AsynchronousFileIOChannelsTest {
        
        private static class FailingWriteRequest implements WriteRequest {
                
-               private final AsynchronousFileIOChannel<WriteRequest> channel;
+               private final AsynchronousFileIOChannel<MemorySegment, 
WriteRequest> channel;
                
                private final MemorySegment segment;
                
-               protected 
FailingWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, 
MemorySegment segment) {
+               protected 
FailingWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> 
targetChannel, MemorySegment segment) {
                        this.channel = targetChannel;
                        this.segment = segment;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index ab5c206..4be667a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.junit.Test;
 
 import java.io.File;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class IOManagerTest {
 
@@ -94,7 +94,7 @@ public class IOManagerTest {
                }
 
                @Override
-               public BlockChannelWriterWithCallback 
createBlockChannelWriter(ID channelID, RequestDoneCallback callback) {
+               public BlockChannelWriterWithCallback 
createBlockChannelWriter(ID channelID, RequestDoneCallback<MemorySegment> 
callback) {
                        throw new UnsupportedOperationException();
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 6ceb05a..be63fe5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -16,24 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.serialization;
-
-import org.junit.Assert;
+package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import 
org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
-import 
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import 
org.apache.flink.runtime.io.network.serialization.RecordDeserializer.DeserializationResult;
-import 
org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
-import 
org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.serialization.types.Util;
+import 
org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
+import 
org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.runtime.io.network.api.serialization.types.Util;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import 
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayDeque;
 
+import static org.mockito.Mockito.mock;
+
 public class SpanningRecordSerializationTest {
 
        @Test
@@ -128,7 +126,7 @@ public class SpanningRecordSerializationTest {
        {
                final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
-               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[segmentSize]), segmentSize, null);
+               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[segmentSize]), mock(BufferRecycler.class));
 
                final ArrayDeque<SerializationTestType> serializedRecords = new 
ArrayDeque<SerializationTestType>();
 
@@ -181,7 +179,7 @@ public class SpanningRecordSerializationTest {
                        SerializationTestType expected = 
serializedRecords.poll();
 
                        SerializationTestType actual = 
expected.getClass().newInstance();
-                       DeserializationResult result = 
deserializer.getNextRecord(actual);
+                       RecordDeserializer.DeserializationResult result = 
deserializer.getNextRecord(actual);
 
                        Assert.assertTrue(result.isFullRecord());
                        Assert.assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index 920d683..50d3639 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -16,23 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.serialization;
-
-import org.junit.Assert;
+package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import 
org.apache.flink.runtime.io.network.serialization.RecordSerializer.SerializationResult;
-import 
org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
-import 
org.apache.flink.runtime.io.network.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.runtime.io.network.serialization.types.Util;
+import 
org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
+import 
org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestTypeFactory;
+import org.apache.flink.runtime.io.network.api.serialization.types.Util;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Random;
 
+import static org.mockito.Mockito.mock;
+
 public class SpanningRecordSerializerTest {
 
        @Test
@@ -40,7 +41,7 @@ public class SpanningRecordSerializerTest {
                final int SEGMENT_SIZE = 16;
 
                final SpanningRecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
-               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
                final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
                Assert.assertFalse(serializer.hasData());
@@ -74,10 +75,10 @@ public class SpanningRecordSerializerTest {
                final int SEGMENT_SIZE = 11;
 
                final SpanningRecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
-               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
 
                try {
-                       Assert.assertEquals(SerializationResult.FULL_RECORD, 
serializer.setNextBuffer(buffer));
+                       
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.setNextBuffer(buffer));
                } catch (IOException e) {
                        e.printStackTrace();
                }
@@ -111,17 +112,17 @@ public class SpanningRecordSerializerTest {
                                }
                        };
 
-                       SerializationResult result = 
serializer.addRecord(emptyRecord);
-                       Assert.assertEquals(SerializationResult.FULL_RECORD, 
result);
+                       RecordSerializer.SerializationResult result = 
serializer.addRecord(emptyRecord);
+                       
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
 
                        result = serializer.addRecord(emptyRecord);
-                       Assert.assertEquals(SerializationResult.FULL_RECORD, 
result);
+                       
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
 
                        result = serializer.addRecord(emptyRecord);
-                       
Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, 
result);
+                       
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
                        result = serializer.setNextBuffer(buffer);
-                       Assert.assertEquals(SerializationResult.FULL_RECORD, 
result);
+                       
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -189,7 +190,7 @@ public class SpanningRecordSerializerTest {
 
        /**
         * Iterates over the provided records and tests whether the {@link 
SpanningRecordSerializer} returns the expected
-        * {@link SerializationResult} values.
+        * {@link RecordSerializer.SerializationResult} values.
         * <p>
         * Only a single {@link MemorySegment} will be allocated.
         *
@@ -200,7 +201,7 @@ public class SpanningRecordSerializerTest {
                final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
                final SpanningRecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
-               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[segmentSize]), Mockito.mock(BufferRecycler.class));
+               final Buffer buffer = new Buffer(new MemorySegment(new 
byte[segmentSize]), mock(BufferRecycler.class));
 
                // 
-------------------------------------------------------------------------------------------------------------
 
@@ -208,17 +209,17 @@ public class SpanningRecordSerializerTest {
 
                int numBytes = 0;
                for (SerializationTestType record : records) {
-                       SerializationResult result = 
serializer.addRecord(record);
+                       RecordSerializer.SerializationResult result = 
serializer.addRecord(record);
                        numBytes += record.length() + SERIALIZATION_OVERHEAD;
 
                        if (numBytes < segmentSize) {
-                               
Assert.assertEquals(SerializationResult.FULL_RECORD, result);
+                               
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
                        } else if (numBytes == segmentSize) {
-                               
Assert.assertEquals(SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, 
result);
+                               
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL,
 result);
                                serializer.setNextBuffer(buffer);
                                numBytes = 0;
                        } else {
-                               
Assert.assertEquals(SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, 
result);
+                               
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
                                while (result.isFullBuffer()) {
                                        numBytes -= segmentSize;

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index 6c1fd64..5ce145b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -18,18 +18,27 @@
 
 package org.apache.flink.runtime.io.network.serialization;
 
-import static org.junit.Assert.*;
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.types.IntType;
+import 
org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.serialization.types.IntType;
-import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
-import 
org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 public class LargeRecordsTest {
 
@@ -42,7 +51,7 @@ public class LargeRecordsTest {
                        final RecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
                        final RecordDeserializer<SerializationTestType> 
deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
 
-                       final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+                       final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
 
                        List<SerializationTestType> originalRecords = new 
ArrayList<SerializationTestType>();
                        List<SerializationTestType> deserializedRecords = new 
ArrayList<SerializationTestType>();
@@ -108,7 +117,7 @@ public class LargeRecordsTest {
                        
                        // move the last (incomplete buffer)
                        Buffer last = serializer.getCurrentBuffer();
-                       
deserializer.setNextMemorySegment(last.getMemorySegment(), last.size());
+                       
deserializer.setNextMemorySegment(last.getMemorySegment(), last.getSize());
                        serializer.clear();
                        
                        // deserialize records, as many as there are in the 
last buffer
@@ -139,7 +148,7 @@ public class LargeRecordsTest {
                        final RecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
                        final RecordDeserializer<SerializationTestType> 
deserializer = new 
SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>();
 
-                       final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+                       final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), mock(BufferRecycler.class));
 
                        List<SerializationTestType> originalRecords = new 
ArrayList<SerializationTestType>();
                        List<SerializationTestType> deserializedRecords = new 
ArrayList<SerializationTestType>();
@@ -205,7 +214,7 @@ public class LargeRecordsTest {
                        
                        // move the last (incomplete buffer)
                        Buffer last = serializer.getCurrentBuffer();
-                       
deserializer.setNextMemorySegment(last.getMemorySegment(), last.size());
+                       
deserializer.setNextMemorySegment(last.getMemorySegment(), last.getSize());
                        serializer.clear();
                        
                        // deserialize records, as many as there are in the 
last buffer

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
index 01a00e4..21be6e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
@@ -24,6 +24,7 @@ import java.util.Random;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.runtime.io.network.api.serialization.types.SerializationTestType;
 
 public class LargeObjectType implements SerializationTestType {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7df6a3d7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index ad15282..38442c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -48,6 +40,14 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public class ExternalSortLargeRecordsITCase {
 
@@ -116,7 +116,12 @@ public class ExternalSortLargeRecordsITCase {
                                        }
                                        
                                }
-                       };
+
+                                               @Override
+                                               public Tuple2<Long, 
SomeMaybeLongValue> next() throws IOException {
+                                                       return next(new 
Tuple2<Long, SomeMaybeLongValue>());
+                                               }
+                                       };
                        
                        @SuppressWarnings("unchecked")
                        Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
@@ -182,7 +187,12 @@ public class ExternalSortLargeRecordsITCase {
                                        }
                                        
                                }
-                       };
+
+                                               @Override
+                                               public Tuple2<Long, 
SomeMaybeLongValue> next() throws IOException {
+                                                       return new Tuple2<Long, 
SomeMaybeLongValue>();
+                                               }
+                                       };
                        
                        @SuppressWarnings("unchecked")
                        Sorter<Tuple2<Long, SomeMaybeLongValue>> sorter = new 
UnilateralSortMerger<Tuple2<Long, SomeMaybeLongValue>>(
@@ -260,7 +270,12 @@ public class ExternalSortLargeRecordsITCase {
                                        }
                                        
                                }
-                       };
+
+                                               @Override
+                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next() throws IOException {
+                                                       return new Tuple2<Long, 
SmallOrMediumOrLargeValue>();
+                                               }
+                                       };
                        
                        @SuppressWarnings("unchecked")
                        Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter 
= new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>(
@@ -326,7 +341,12 @@ public class ExternalSortLargeRecordsITCase {
                                        }
                                        
                                }
-                       };
+
+                                               @Override
+                                               public Tuple2<Long, 
SmallOrMediumOrLargeValue> next() throws IOException {
+                                                       return new Tuple2<Long, 
SmallOrMediumOrLargeValue>();
+                                               }
+                                       };
                        
                        @SuppressWarnings("unchecked")
                        Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter 
= new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>(

Reply via email to