[FLINK-1296] [runtime] Add better paged disk I/O readers / writers

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/996d404c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/996d404c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/996d404c

Branch: refs/heads/master
Commit: 996d404ced9347aaa00de4356c07333d46322eae
Parents: 7610588
Author: Stephan Ewen <[email protected]>
Authored: Mon Dec 1 20:21:25 2014 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Jan 21 12:01:35 2015 +0100

----------------------------------------------------------------------
 .../runtime/io/disk/FileChannelInputView.java   | 148 ++++++++
 .../runtime/io/disk/FileChannelOutputView.java  | 144 ++++++++
 .../io/disk/SeekableFileChannelInputView.java   | 186 ++++++++++
 .../disk/iomanager/AbstractFileIOChannel.java   |  13 +-
 .../disk/iomanager/AsynchronousBlockReader.java |   7 +-
 .../iomanager/AsynchronousFileIOChannel.java    |  18 +-
 .../io/disk/iomanager/BlockChannelReader.java   |   7 +
 .../io/disk/iomanager/FileIOChannel.java        |   9 +-
 .../io/disk/iomanager/IOManagerAsync.java       |  14 +-
 .../runtime/memorymanager/MemoryManager.java    |   3 +-
 .../io/disk/FileChannelStreamsITCase.java       | 307 ++++++++++++++++
 .../runtime/io/disk/FileChannelStreamsTest.java | 119 ++++++
 .../disk/SeekableFileChannelInputViewTest.java  | 157 ++++++++
 .../AsynchronousFileIOChannelsTest.java         | 175 +++++++++
 .../io/disk/iomanager/IOManagerAsyncTest.java   | 359 +++++++++++++++++++
 .../io/disk/iomanager/IOManagerITCase.java      |  66 +---
 .../io/disk/iomanager/IOManagerTest.java        | 237 +++---------
 .../operators/testutils/PairGenerator.java      | 161 +++++++++
 18 files changed, 1872 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
new file mode 100644
index 0000000..9fb8072
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.MathUtils;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a 
{@link BlockChannelReader},
+ * making it effectively a data input stream. The view reads it data in blocks 
from the underlying channel.
+ * The view can read data that has been written by a {@link 
FileChannelOutputView}, or that was written in blocks
+ * in another fashion.
+ */
+public class FileChannelInputView extends AbstractPagedInputView {
+       
+       private final BlockChannelReader reader;
+       
+       private final MemoryManager memManager;
+       
+       private final List<MemorySegment> memory;
+       
+       private final int sizeOfLastBlock;
+       
+       private int numRequestsRemaining;
+       
+       private int numBlocksRemaining;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public FileChannelInputView(BlockChannelReader reader, MemoryManager 
memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException 
{
+               super(0);
+               
+               checkNotNull(reader);
+               checkNotNull(memManager);
+               checkNotNull(memory);
+               checkArgument(!reader.isClosed());
+               checkArgument(memory.size() > 0);
+               
+               this.reader = reader;
+               this.memManager = memManager;
+               this.memory = memory;
+               this.sizeOfLastBlock = sizeOfLastBlock;
+               
+               try {
+                       final long channelLength = reader.getSize();
+                       final int segmentSize = memManager.getPageSize();
+                       
+                       this.numBlocksRemaining = 
MathUtils.checkedDownCast(channelLength / segmentSize);
+                       if (channelLength % segmentSize != 0) {
+                               this.numBlocksRemaining++;
+                       }
+                       
+                       this.numRequestsRemaining = numBlocksRemaining;
+                       
+                       for (int i = 0; i < memory.size(); i++) {
+                               sendReadRequest(memory.get(i));
+                       }
+                       
+                       advance();
+               }
+               catch (IOException e) {
+                       memManager.release(memory);
+                       throw e;
+               }
+       }
+       
+       public void close() throws IOException {
+               close(false);
+       }
+       
+       public void closeAndDelete() throws IOException {
+               close(true);
+       }
+       
+       private void close(boolean deleteFile) throws IOException {
+               try {
+                       clear();
+                       if (deleteFile) {
+                               reader.closeAndDelete();
+                       } else {
+                               reader.close();
+                       }
+               } finally {
+                       synchronized (memory) {
+                               memManager.release(memory);
+                               memory.clear();
+                       }
+               }
+       }
+       
+       @Override
+       protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
+               // check for end-of-stream
+               if (numBlocksRemaining <= 0) {
+                       reader.close();
+                       throw new EOFException();
+               }
+               
+               // send a request first. if we have only a single segment, this 
same segment will be the one obtained in the next lines
+               if (current != null) {
+                       sendReadRequest(current);
+               }
+               
+               // get the next segment
+               numBlocksRemaining--;
+               return reader.getNextReturnedSegment();
+       }
+       
+       @Override
+       protected int getLimitForSegment(MemorySegment segment) {
+               return numBlocksRemaining > 0 ? segment.size() : 
sizeOfLastBlock;
+       }
+       
+       private void sendReadRequest(MemorySegment seg) throws IOException {
+               if (numRequestsRemaining > 0) {
+                       reader.readBlock(seg);
+                       numRequestsRemaining--;
+               } else {
+                       memManager.release(seg);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
new file mode 100644
index 0000000..2b8b728
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a 
{@link BlockChannelWriter}, making it effectively a data output
+ * stream. The view writes it data in blocks to the underlying channel.
+ */
+public class FileChannelOutputView extends AbstractPagedOutputView {
+       
+       private final BlockChannelWriter writer;                // the writer 
to the channel
+       
+       private final MemoryManager memManager;
+       
+       private final List<MemorySegment> memory;
+       
+       private int numBlocksWritten;
+       
+       private int bytesInLatestSegment;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public FileChannelOutputView(BlockChannelWriter writer, MemoryManager 
memManager, List<MemorySegment> memory, int segmentSize) throws IOException {
+               super(segmentSize, 0);
+               
+               checkNotNull(writer);
+               checkNotNull(memManager);
+               checkNotNull(memory);
+               checkArgument(!writer.isClosed());
+               
+               this.writer = writer;
+               this.memManager = memManager;
+               this.memory = memory;
+               
+               
+               for (MemorySegment next : memory) {
+                       writer.getReturnQueue().add(next);
+               }
+               
+               // move to the first page
+               advance();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Closes this output, writing pending data and releasing the memory.
+        * 
+        * @throws IOException Thrown, if the pending data could not be written.
+        */
+       public void close() throws IOException {
+               close(false);
+       }
+       
+       /**
+        * Closes this output, writing pending data and releasing the memory.
+        * 
+        * @throws IOException Thrown, if the pending data could not be written.
+        */
+       public void closeAndDelete() throws IOException {
+               close(true);
+       }
+       
+       private void close(boolean delete) throws IOException {
+               try {
+                       // send off set last segment, if we have not been 
closed before
+                       MemorySegment current = getCurrentSegment();
+                       if (current != null) {
+                               writeSegment(current, 
getCurrentPositionInSegment());
+                       }
+
+                       clear();
+                       if (delete) {
+                               writer.closeAndDelete();
+                       } else {
+                               writer.close();
+                       }
+               }
+               finally {
+                       memManager.release(memory);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Gets the number of blocks written by this output view.
+        * 
+        * @return The number of blocks written by this output view.
+        */
+       public int getBlockCount() {
+               return numBlocksWritten;
+       }
+       
+       /**
+        * Gets the number of bytes written in the latest memory segment.
+        * 
+        * @return The number of bytes written in the latest memory segment.
+        */
+       public int getBytesInLatestSegment() {
+               return bytesInLatestSegment;
+       }
+       
+       @Override
+       protected MemorySegment nextSegment(MemorySegment current, int 
posInSegment) throws IOException {
+               if (current != null) {
+                       writeSegment(current, posInSegment);
+               }
+               return writer.getNextReturnedSegment();
+       }
+       
+       private void writeSegment(MemorySegment segment, int writePosition) 
throws IOException {
+               writer.writeBlock(segment);
+               numBlocksWritten++;
+               bytesInLatestSegment = writePosition;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
new file mode 100644
index 0000000..e97a1ff
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.MathUtils;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a 
{@link BlockChannelReader},
+ * making it effectively a data input stream. The view reads it data in blocks 
from the underlying channel.
+ * The view can read data that has been written by a {@link 
FileChannelOutputView}, or that was written in blocks
+ * in another fashion.
+ */
+public class SeekableFileChannelInputView extends AbstractPagedInputView {
+       
+       private BlockChannelReader reader;
+       
+       private final IOManager ioManager;
+       
+       private final FileIOChannel.ID channelId;
+       
+       private final MemoryManager memManager;
+       
+       private final List<MemorySegment> memory;
+       
+       private final int sizeOfLastBlock;
+       
+       private final int numBlocksTotal;
+       
+       private final int segmentSize;
+       
+       private int numRequestsRemaining;
+       
+       private int numBlocksRemaining;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public SeekableFileChannelInputView(IOManager ioManager, 
FileIOChannel.ID channelId, MemoryManager memManager, List<MemorySegment> 
memory, int sizeOfLastBlock) throws IOException {
+               super(0);
+               
+               checkNotNull(ioManager);
+               checkNotNull(channelId);
+               checkNotNull(memManager);
+               checkNotNull(memory);
+               
+               this.ioManager = ioManager;
+               this.channelId = channelId;
+               this.memManager = memManager;
+               this.memory = memory;
+               this.sizeOfLastBlock = sizeOfLastBlock;
+               this.segmentSize = memManager.getPageSize();
+               
+               this.reader = ioManager.createBlockChannelReader(channelId);
+               
+               try {
+                       final long channelLength = reader.getSize();
+                       
+                       final int blockCount =  
MathUtils.checkedDownCast(channelLength / segmentSize);
+                       this.numBlocksTotal = (channelLength % segmentSize == 
0) ? blockCount : blockCount + 1;
+
+                       this.numBlocksRemaining = this.numBlocksTotal;
+                       this.numRequestsRemaining = numBlocksRemaining;
+                       
+                       for (int i = 0; i < memory.size(); i++) {
+                               sendReadRequest(memory.get(i));
+                       }
+                       
+                       advance();
+               }
+               catch (IOException e) {
+                       memManager.release(memory);
+                       throw e;
+               }
+       }
+       
+       public void seek(long position) throws IOException {
+               final int block = MathUtils.checkedDownCast(position / 
segmentSize);
+               final int positionInBlock = (int) (position % segmentSize);
+               
+               if (position < 0 || block >= numBlocksTotal || (block == 
numBlocksTotal - 1 && positionInBlock > sizeOfLastBlock)) {
+                       throw new IllegalArgumentException("Position is out of 
range");
+               }
+               
+               clear();
+               if (reader != null) {
+                       reader.close();
+               }
+               
+               reader = ioManager.createBlockChannelReader(channelId);
+               
+               if (block > 0) {
+                       reader.seekToPosition(block * segmentSize);
+               }
+               
+               this.numBlocksRemaining = this.numBlocksTotal - block;
+               this.numRequestsRemaining = numBlocksRemaining;
+               
+               for (int i = 0; i < memory.size(); i++) {
+                       sendReadRequest(memory.get(i));
+               }
+               
+               numBlocksRemaining--;
+               seekInput(reader.getNextReturnedSegment(), positionInBlock, 
numBlocksRemaining == 0 ? sizeOfLastBlock : segmentSize);
+       }
+       
+       public void close() throws IOException {
+               close(false);
+       }
+       
+       public void closeAndDelete() throws IOException {
+               close(true);
+       }
+       
+       private void close(boolean deleteFile) throws IOException {
+               try {
+                       clear();
+                       if (deleteFile) {
+                               reader.closeAndDelete();
+                       } else {
+                               reader.close();
+                       }
+               } finally {
+                       synchronized (memory) {
+                               memManager.release(memory);
+                               memory.clear();
+                       }
+               }
+       }
+       
+       @Override
+       protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
+               // check for end-of-stream
+               if (numBlocksRemaining <= 0) {
+                       reader.close();
+                       throw new EOFException();
+               }
+               
+               // send a request first. if we have only a single segment, this 
same segment will be the one obtained in the next lines
+               if (current != null) {
+                       sendReadRequest(current);
+               }
+               
+               // get the next segment
+               numBlocksRemaining--;
+               return reader.getNextReturnedSegment();
+       }
+       
+       @Override
+       protected int getLimitForSegment(MemorySegment segment) {
+               return numBlocksRemaining > 0 ? segment.size() : 
sizeOfLastBlock;
+       }
+       
+       private void sendReadRequest(MemorySegment seg) throws IOException {
+               if (numRequestsRemaining > 0) {
+                       reader.readBlock(seg);
+                       numRequestsRemaining--;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
index ecb794e..3991167 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -64,17 +64,18 @@ public abstract class AbstractFileIOChannel implements 
FileIOChannel {
        
        // 
--------------------------------------------------------------------------------------------
 
-       /**
-        * Gets the channel ID of this channel.
-        * 
-        * @return This channel's ID.
-        */
        @Override
        public final FileIOChannel.ID getChannelID() {
                return this.id;
        }
        
        @Override
+       public long getSize() throws IOException {
+               FileChannel channel = fileChannel;
+               return channel == null ? 0 : channel.size();
+       }
+       
+       @Override
        public abstract boolean isClosed();
        
        @Override
@@ -103,4 +104,4 @@ public abstract class AbstractFileIOChannel implements 
FileIOChannel {
                        deleteChannel();
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
index a15acb5..acfa71f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -115,4 +115,9 @@ public class AsynchronousBlockReader extends 
AsynchronousFileIOChannel<MemorySeg
        public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
                return this.returnSegments;
        }
-}
+       
+       @Override
+       public void seekToPosition(long position) throws IOException {
+               this.requestQueue.add(new SeekRequest(this, position));
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 89ebb25..9a9ee61 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -72,7 +72,7 @@ public abstract class AsynchronousFileIOChannel<T, R extends 
IORequest> extends
         * @throws IOException Thrown, if the channel could no be opened.
         */
        protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, 
RequestQueue<R> requestQueue, 
-                       RequestDoneCallback<T> callback, boolean writeEnabled) 
throws IOException
+                       RequestDoneCallback callback, boolean writeEnabled) 
throws IOException
        {
                super(channelID, writeEnabled);
 
@@ -113,7 +113,9 @@ public abstract class AsynchronousFileIOChannel<T, R 
extends IORequest> extends
                                                this.closeLock.wait(1000);
                                                checkErroneous();
                                        }
-                                       catch (InterruptedException ignored) {}
+                                       catch (InterruptedException iex) {
+                                               throw new IOException("Closing 
of asynchronous file channel was interrupted.");
+                                       }
                                }
                        }
                        finally {
@@ -181,13 +183,11 @@ public abstract class AsynchronousFileIOChannel<T, R 
extends IORequest> extends
                        }
                }
                finally {
-                       // decrement the number of missing buffers. If we are 
currently closing, notify the 
-                       if (this.closed) {
-                               synchronized (this.closeLock) {
-                                       int num = 
this.requestsNotReturned.decrementAndGet();
-                                       if (num == 0) {
-                                               this.closeLock.notifyAll();
-                                       }
+                       // decrement the number of missing buffers. If we are 
currently closing, notify the waiters
+                       synchronized (this.closeLock) {
+                               final int num = 
this.requestsNotReturned.decrementAndGet();
+                               if (this.closed && num == 0) {
+                                       this.closeLock.notifyAll();
                                }
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index f25827a..8f7f218 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -60,5 +60,12 @@ public interface BlockChannelReader extends FileIOChannel {
         * @return The queue with the full memory segments.
         */
        LinkedBlockingQueue<MemorySegment> getReturnQueue();
+       
+       /**
+        * Seeks the underlying file channel to the given position.
+        * 
+        * @param position The position to seek to.
+        */
+       void seekToPosition(long position) throws IOException;
 }
        
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index 7c9d31b..d6f4458 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -38,6 +38,13 @@ public interface FileIOChannel {
        FileIOChannel.ID getChannelID();
        
        /**
+        * Gets the size (in bytes) of the file underlying the channel.
+        * 
+        * @return The size (in bytes) of the file underlying the channel.
+        */
+       long getSize() throws IOException;
+       
+       /**
         * Checks whether the channel has been closed.
         * 
         * @return True if the channel has been closed, false otherwise.
@@ -153,4 +160,4 @@ public interface FileIOChannel {
                        return new ID(String.format(FORMAT, 
this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 7de8651..6489396 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -231,6 +231,18 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                checkState(!shutdown, "I/O-Manger is closed.");
                return new AsynchronousBulkBlockReader(channelID, 
this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
        }
+       
+       // 
-------------------------------------------------------------------------
+       //                             For Testing
+       // 
-------------------------------------------------------------------------
+       
+       RequestQueue<ReadRequest> getReadRequestQueue(FileIOChannel.ID 
channelID) {
+               return this.readers[channelID.getThreadNum()].requestQueue;
+       }
+       
+       RequestQueue<WriteRequest> getWriteRequestQueue(FileIOChannel.ID 
channelID) {
+               return this.writers[channelID.getThreadNum()].requestQueue;
+       }
 
        // 
-------------------------------------------------------------------------
        //                           I/O Worker Threads
@@ -446,4 +458,4 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                }
                
        }; // end writer thread
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
index 875b223..1ab6931 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
@@ -119,8 +119,7 @@ public interface MemoryManager {
        void shutdown();
        
        /**
-        * Checks if the memory manager all memory available and the 
descriptors of the free segments
-        * describe a contiguous memory layout.
+        * Checks if the memory manager all memory available.
         * 
         * @return True, if the memory manager is empty and valid, false if it 
is not empty or corrupted.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
new file mode 100644
index 0000000..27928a9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.PairGenerator;
+import org.apache.flink.runtime.operators.testutils.PairGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.PairGenerator.Pair;
+import org.apache.flink.runtime.operators.testutils.PairGenerator.ValueMode;
+
+import java.io.EOFException;
+import java.util.List;
+
+public class FileChannelStreamsITCase {
+       
+       private static final long SEED = 649180756312423613L;
+
+       private static final int KEY_MAX = Integer.MAX_VALUE;
+
+       private static final int VALUE_SHORT_LENGTH = 114;
+       
+       private static final int VALUE_LONG_LENGTH = 112 * 1024;
+
+       private static final int NUM_PAIRS_SHORT = 1000000;
+       
+       private static final int NUM_PAIRS_LONG = 3000;
+       
+       private static final int MEMORY_PAGE_SIZE = 32 * 1024;
+       
+       private static final int NUM_MEMORY_SEGMENTS = 3;
+
+       private IOManager ioManager;
+
+       private MemoryManager memManager;
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Before
+       public void beforeTest() {
+               memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * 
MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE);
+               ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               ioManager.shutdown();
+               assertTrue("I/O Manager was not properly shut down.", 
ioManager.isProperlyShutDown());
+               assertTrue("The memory has not been properly released", 
memManager.verifyEmpty());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testWriteReadSmallRecords() {
+               try {
+                       List<MemorySegment> memory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final PairGenerator generator = new PairGenerator(SEED, 
KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       final FileIOChannel.ID channel = 
ioManager.createChannel();
+                       
+                       // create the writer output view
+                       final BlockChannelWriter writer = 
ioManager.createBlockChannelWriter(channel);
+                       final FileChannelOutputView outView = new 
FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+                       
+                       // write a number of pairs
+                       Pair pair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+                               generator.next(pair);
+                               pair.write(outView);
+                       }
+                       outView.close();
+                       
+                       // create the reader input view
+                       List<MemorySegment> readMemory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final BlockChannelReader reader = 
ioManager.createBlockChannelReader(channel);
+                       final FileChannelInputView inView = new 
FileChannelInputView(reader, memManager, readMemory, 
outView.getBytesInLatestSegment());
+                       generator.reset();
+                       
+                       // read and re-generate all records and compare them
+                       Pair readPair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+                               generator.next(pair);
+                               readPair.read(inView);
+                               assertEquals("The re-generated and the read 
record do not match.", pair, readPair);
+                       }
+                       
+                       inView.close();
+                       reader.deleteChannel();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testWriteAndReadLongRecords() {
+               try {
+                       final List<MemorySegment> memory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final PairGenerator generator = new PairGenerator(SEED, 
KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       final FileIOChannel.ID channel = 
this.ioManager.createChannel();
+                       
+                       // create the writer output view
+                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channel);
+                       final FileChannelOutputView outView = new 
FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+                       
+                       // write a number of pairs
+                       Pair pair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_LONG; i++) {
+                               generator.next(pair);
+                               pair.write(outView);
+                       }
+                       outView.close();
+                       
+                       // create the reader input view
+                       List<MemorySegment> readMemory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final BlockChannelReader reader = 
ioManager.createBlockChannelReader(channel);
+                       final FileChannelInputView inView = new 
FileChannelInputView(reader, memManager, readMemory, 
outView.getBytesInLatestSegment());
+                       generator.reset();
+                       
+                       // read and re-generate all records and compare them
+                       Pair readPair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_LONG; i++) {
+                               generator.next(pair);
+                               readPair.read(inView);
+                               assertEquals("The re-generated and the read 
record do not match.", pair, readPair);
+                       }
+                       
+                       inView.close();
+                       reader.deleteChannel();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testReadTooMany() {
+               try {
+                       final List<MemorySegment> memory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final PairGenerator generator = new PairGenerator(SEED, 
KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       final FileIOChannel.ID channel = 
this.ioManager.createChannel();
+                       
+                       // create the writer output view
+                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channel);
+                       final FileChannelOutputView outView = new 
FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+       
+                       // write a number of pairs
+                       Pair pair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+                               generator.next(pair);
+                               pair.write(outView);
+                       }
+                       outView.close();
+       
+                       // create the reader input view
+                       List<MemorySegment> readMemory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final BlockChannelReader reader = 
ioManager.createBlockChannelReader(channel);
+                       final FileChannelInputView inView = new 
FileChannelInputView(reader, memManager, readMemory, 
outView.getBytesInLatestSegment());
+                       generator.reset();
+       
+                       // read and re-generate all records and compare them
+                       try {
+                               Pair readPair = new Pair();
+                               for (int i = 0; i < NUM_PAIRS_SHORT + 1; i++) {
+                                       generator.next(pair);
+                                       readPair.read(inView);
+                                       assertEquals("The re-generated and the 
read record do not match.", pair, readPair);
+                               }
+                               fail("Expected an EOFException which did not 
occur.");
+                       }
+                       catch (EOFException eofex) {
+                               // expected
+                       }
+                       
+                       inView.close();
+                       reader.deleteChannel();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testWriteReadOneBufferOnly() {
+               try {
+                       final List<MemorySegment> memory = 
memManager.allocatePages(new DummyInvokable(), 1);
+                       
+                       final PairGenerator generator = new PairGenerator(SEED, 
KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       final FileIOChannel.ID channel = 
this.ioManager.createChannel();
+                       
+                       // create the writer output view
+                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channel);
+                       final FileChannelOutputView outView = new 
FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+                       
+                       // write a number of pairs
+                       Pair pair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+                               generator.next(pair);
+                               pair.write(outView);
+                       }
+                       outView.close();
+                       
+                       // create the reader input view
+                       List<MemorySegment> readMemory = 
memManager.allocatePages(new DummyInvokable(), 1);
+                       
+                       final BlockChannelReader reader = 
ioManager.createBlockChannelReader(channel);
+                       final FileChannelInputView inView = new 
FileChannelInputView(reader, memManager, readMemory, 
outView.getBytesInLatestSegment());
+                       generator.reset();
+                       
+                       // read and re-generate all records and compare them
+                       Pair readPair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+                               generator.next(pair);
+                               readPair.read(inView);
+                               assertEquals("The re-generated and the read 
record do not match.", pair, readPair);
+                       }
+                       
+                       inView.close();
+                       reader.deleteChannel();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testWriteReadNotAll() {
+               try {
+                       final List<MemorySegment> memory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final PairGenerator generator = new PairGenerator(SEED, 
KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+                       final FileIOChannel.ID channel = 
this.ioManager.createChannel();
+                       
+                       // create the writer output view
+                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channel);
+                       final FileChannelOutputView outView = new 
FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+                       
+                       // write a number of pairs
+                       Pair pair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+                               generator.next(pair);
+                               pair.write(outView);
+                       }
+                       outView.close();
+                       
+                       // create the reader input view
+                       List<MemorySegment> readMemory = 
memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+                       
+                       final BlockChannelReader reader = 
ioManager.createBlockChannelReader(channel);
+                       final FileChannelInputView inView = new 
FileChannelInputView(reader, memManager, readMemory, 
outView.getBytesInLatestSegment());
+                       generator.reset();
+                       
+                       // read and re-generate all records and compare them
+                       Pair readPair = new Pair();
+                       for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) {
+                               generator.next(pair);
+                               readPair.read(inView);
+                               assertEquals("The re-generated and the read 
record do not match.", pair, readPair);
+                       }
+                       
+                       inView.close();
+                       reader.deleteChannel();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
new file mode 100644
index 0000000..1db2a6f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+
+public class FileChannelStreamsTest {
+
+       @Test
+       public void testCloseAndDeleteOutputView() {
+               final IOManager ioManager = new IOManagerAsync();
+               try {
+                       MemoryManager memMan = new DefaultMemoryManager(4 * 
16*1024, 1, 16*1024);
+                       List<MemorySegment> memory = new 
ArrayList<MemorySegment>();
+                       memMan.allocatePages(new DummyInvokable(), memory, 4);
+                       
+                       FileIOChannel.ID channel = ioManager.createChannel();
+                       BlockChannelWriter writer = 
ioManager.createBlockChannelWriter(channel);
+                       
+                       FileChannelOutputView out = new 
FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
+                       new StringValue("Some test text").write(out);
+                       
+                       // close for the first time, make sure all memory 
returns
+                       out.close();
+                       assertTrue(memMan.verifyEmpty());
+                       
+                       // close again, should not cause an exception
+                       out.close();
+                       
+                       // delete, make sure file is removed
+                       out.closeAndDelete();
+                       assertFalse(new File(channel.getPath()).exists());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       ioManager.shutdown();
+               }
+       }
+       
+       @Test
+       public void testCloseAndDeleteInputView() {
+               final IOManager ioManager = new IOManagerAsync();
+               try {
+                       MemoryManager memMan = new DefaultMemoryManager(4 * 
16*1024, 1, 16*1024);
+                       List<MemorySegment> memory = new 
ArrayList<MemorySegment>();
+                       memMan.allocatePages(new DummyInvokable(), memory, 4);
+                       
+                       FileIOChannel.ID channel = ioManager.createChannel();
+                       
+                       // add some test data
+                       {
+                               FileWriter wrt = new 
FileWriter(channel.getPath());
+                               wrt.write("test data");
+                               wrt.close();
+                       }
+                       
+                       BlockChannelReader reader = 
ioManager.createBlockChannelReader(channel);
+                       FileChannelInputView in = new 
FileChannelInputView(reader, memMan, memory, 9);
+                       
+                       // read just something
+                       in.readInt();
+                       
+                       // close for the first time, make sure all memory 
returns
+                       in.close();
+                       assertTrue(memMan.verifyEmpty());
+                       
+                       // close again, should not cause an exception
+                       in.close();
+                       
+                       // delete, make sure file is removed
+                       in.closeAndDelete();
+                       assertFalse(new File(channel.getPath()).exists());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       ioManager.shutdown();
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
new file mode 100644
index 0000000..7e4d70d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static org.junit.Assert.*;
+
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.junit.Test;
+
+
+public class SeekableFileChannelInputViewTest {
+
+       @Test
+       public void testSeek() {
+               final IOManager ioManager = new IOManagerAsync();
+               final int PAGE_SIZE = 16 * 1024;
+               final int NUM_RECORDS = 120000;
+               // integers across 7.x pages (7 pages = 114.688 bytes, 8 pages 
= 131.072 bytes)
+               
+               try {
+                       MemoryManager memMan = new DefaultMemoryManager(4 * 
PAGE_SIZE, 1, PAGE_SIZE);
+                       List<MemorySegment> memory = new 
ArrayList<MemorySegment>();
+                       memMan.allocatePages(new DummyInvokable(), memory, 4);
+                       
+                       FileIOChannel.ID channel = ioManager.createChannel();
+                       BlockChannelWriter writer = 
ioManager.createBlockChannelWriter(channel);
+                       FileChannelOutputView out = new 
FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
+                       
+                       // write some integers across 7.5 pages (7 pages = 
114.688 bytes, 8 pages = 131.072 bytes)
+                       for (int i = 0; i < NUM_RECORDS; i += 4) {
+                               out.writeInt(i);
+                       }
+                       // close for the first time, make sure all memory 
returns
+                       out.close();
+                       assertTrue(memMan.verifyEmpty());
+                       
+                       memMan.allocatePages(new DummyInvokable(), memory, 4);
+                       SeekableFileChannelInputView in = new 
SeekableFileChannelInputView(ioManager, channel, memMan, memory, 
out.getBytesInLatestSegment());
+                       
+                       // read first, complete
+                       for (int i = 0; i < NUM_RECORDS; i += 4) {
+                               assertEquals(i, in.readInt());
+                       }
+                       try {
+                               in.readInt();
+                               fail("should throw EOF exception");
+                       } catch (EOFException e) {}
+                       
+                       // seek to the middle of the 3rd page
+                       int i = 2 * PAGE_SIZE + PAGE_SIZE / 4;
+                       in.seek(i);
+                       for (; i < NUM_RECORDS; i += 4) {
+                               assertEquals(i, in.readInt());
+                       }
+                       try {
+                               in.readInt();
+                               fail("should throw EOF exception");
+                       } catch (EOFException e) {}
+                       
+                       // seek to the end
+                       i = 120000 - 4;
+                       in.seek(i);
+                       for (; i < NUM_RECORDS; i += 4) {
+                               assertEquals(i, in.readInt());
+                       }
+                       try {
+                               in.readInt();
+                               fail("should throw EOF exception");
+                       } catch (EOFException e) {}
+                       
+                       // seek to the beginning
+                       i = 0;
+                       in.seek(i);
+                       for (; i < NUM_RECORDS; i += 4) {
+                               assertEquals(i, in.readInt());
+                       }
+                       try {
+                               in.readInt();
+                               fail("should throw EOF exception");
+                       } catch (EOFException e) {}
+                       
+                       // seek to after a page
+                       i = PAGE_SIZE;
+                       in.seek(i);
+                       for (; i < NUM_RECORDS; i += 4) {
+                               assertEquals(i, in.readInt());
+                       }
+                       try {
+                               in.readInt();
+                               fail("should throw EOF exception");
+                       } catch (EOFException e) {}
+                       
+                       // seek to after a page
+                       i = 3 * PAGE_SIZE;
+                       in.seek(i);
+                       for (; i < NUM_RECORDS; i += 4) {
+                               assertEquals(i, in.readInt());
+                       }
+                       try {
+                               in.readInt();
+                               fail("should throw EOF exception");
+                       } catch (EOFException e) {}
+                       
+                       // seek to the end
+                       i = NUM_RECORDS;
+                       in.seek(i);
+                       try {
+                               in.readInt();
+                               fail("should throw EOF exception");
+                       } catch (EOFException e) {}
+                       
+                       // seek out of bounds
+                       try {
+                               in.seek(-10);
+                               fail("should throw an exception");
+                       } catch (IllegalArgumentException e) {}
+                       try {
+                               in.seek(NUM_RECORDS + 1);
+                               fail("should throw an exception");
+                       } catch (IllegalArgumentException e) {}
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       ioManager.shutdown();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
new file mode 100644
index 0000000..1e9d4d4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.Test;
+
+public class AsynchronousFileIOChannelsTest {
+
+       @Test
+       public void testClosingWaits() {
+               IOManagerAsync ioMan = new IOManagerAsync();
+               try {
+                       
+                       final int NUM_BLOCKS = 100;
+                       final MemorySegment seg = new MemorySegment(new byte[32 
* 1024]);
+                       
+                       final AtomicInteger callbackCounter = new 
AtomicInteger();
+                       final AtomicBoolean exceptionOccurred = new 
AtomicBoolean();
+                       
+                       final RequestDoneCallback callback = new 
RequestDoneCallback() {
+                               
+                               @Override
+                               public void requestSuccessful(MemorySegment 
buffer) {
+                                       // we do the non safe variant. the 
callbacks should come in order from
+                                       // the same thread, so it should always 
work
+                                       
callbackCounter.set(callbackCounter.get() + 1);
+                                       
+                                       if (buffer != seg) {
+                                               exceptionOccurred.set(true);
+                                       }
+                               }
+                               
+                               @Override
+                               public void requestFailed(MemorySegment buffer, 
IOException e) {
+                                       exceptionOccurred.set(true);
+                               }
+                       };
+                       
+                       BlockChannelWriterWithCallback writer = 
ioMan.createBlockChannelWriter(ioMan.createChannel(), callback);
+                       try {
+                               for (int i = 0; i < NUM_BLOCKS; i++) {
+                                       writer.writeBlock(seg);
+                               }
+                               
+                               writer.close();
+                               
+                               assertEquals(NUM_BLOCKS, callbackCounter.get());
+                               assertFalse(exceptionOccurred.get());
+                       }
+                       finally {
+                               writer.closeAndDelete();
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       ioMan.shutdown();
+               }
+       }
+       
+       @Test
+       public void testExceptionForwardsToClose() {
+               IOManagerAsync ioMan = new IOManagerAsync();
+               try {
+                       testExceptionForwardsToClose(ioMan, 100, 1);
+                       testExceptionForwardsToClose(ioMan, 100, 50);
+                       testExceptionForwardsToClose(ioMan, 100, 100);
+               }
+               finally {
+                       ioMan.shutdown();
+               }
+       }
+       
+       private void testExceptionForwardsToClose(IOManagerAsync ioMan, final 
int numBlocks, final int failingBlock) {
+               try {
+                       MemorySegment seg = new MemorySegment(new byte[32 * 
1024]);
+                       FileIOChannel.ID channelId = ioMan.createChannel();
+                       
+                       BlockChannelWriterWithCallback writer = new 
AsynchronousBlockWriterWithCallback(channelId, 
+                                       ioMan.getWriteRequestQueue(channelId), 
new NoOpCallback()) {
+                               
+                               private int numBlocks;
+                               
+                               @Override
+                               public void writeBlock(MemorySegment segment) 
throws IOException {
+                                       numBlocks++;
+                                       
+                                       if (numBlocks == failingBlock) {
+                                               
this.requestsNotReturned.incrementAndGet();
+                                               this.requestQueue.add(new 
FailingWriteRequest(this, segment));
+                                       } else {
+                                               super.writeBlock(segment);
+                                       }
+                               }
+                       };
+                       
+                       try {
+                               for (int i = 0; i < numBlocks; i++) {
+                                       writer.writeBlock(seg);
+                               }
+                               
+                               writer.close();
+                               fail("did not forward exception");
+                       }
+                       catch (IOException e) {
+                               // expected
+                       }
+                       finally {
+                               try {
+                                       writer.closeAndDelete();
+                               } catch (Throwable t) {}
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       private static class NoOpCallback implements RequestDoneCallback {
+
+               @Override
+               public void requestSuccessful(MemorySegment buffer) {}
+
+               @Override
+               public void requestFailed(MemorySegment buffer, IOException e) 
{}
+       }
+       
+       private static class FailingWriteRequest implements WriteRequest {
+               
+               private final AsynchronousFileIOChannel<WriteRequest> channel;
+               
+               private final MemorySegment segment;
+               
+               protected 
FailingWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, 
MemorySegment segment) {
+                       this.channel = targetChannel;
+                       this.segment = segment;
+               }
+
+               @Override
+               public void write() throws IOException {
+                       throw new IOException();
+               }
+
+               @Override
+               public void requestDone(IOException ioex) {
+                       this.channel.handleProcessedBuffer(this.segment, ioex);
+               }
+       } 
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
new file mode 100644
index 0000000..297eeed
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
+import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
+
+public class IOManagerAsyncTest {
+       
+       private IOManagerAsync ioManager;
+       
+       // 
------------------------------------------------------------------------
+       //                           Setup & Shutdown
+       // 
------------------------------------------------------------------------
+       
+       @Before
+       public void beforeTest() {
+               ioManager = new IOManagerAsync();
+       }
+
+       @After
+       public void afterTest() {
+               this.ioManager.shutdown();
+               assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
+       }
+
+       // 
------------------------------------------------------------------------
+       //                           Test Methods
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void channelReadWriteOneSegment() {
+               final int NUM_IOS = 1111;
+               
+               try {
+                       final FileIOChannel.ID channelID = 
this.ioManager.createChannel();
+                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channelID);
+                       
+                       MemorySegment memSeg = new MemorySegment(new byte[32 * 
1024]);
+                       
+                       for (int i = 0; i < NUM_IOS; i++) {
+                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
+                                       memSeg.putInt(pos, i);
+                               }
+                               
+                               writer.writeBlock(memSeg);
+                               memSeg = writer.getNextReturnedSegment();
+                       }
+                       
+                       writer.close();
+                       
+                       final BlockChannelReader reader = 
this.ioManager.createBlockChannelReader(channelID);
+                       for (int i = 0; i < NUM_IOS; i++) {
+                               reader.readBlock(memSeg);
+                               memSeg = reader.getNextReturnedSegment();
+                               
+                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
+                                       if (memSeg.getInt(pos) != i) {
+                                               fail("Read memory segment 
contains invalid data.");
+                                       }
+                               }
+                       }
+                       
+                       reader.closeAndDelete();
+               }
+               catch (Exception ex) {
+                       ex.printStackTrace();
+                       fail("Test encountered an exception: " + 
ex.getMessage());
+               }
+       }
+       
+       @Test
+       public void channelReadWriteMultipleSegments() {
+               final int NUM_IOS = 1111;
+               final int NUM_SEGS = 16;
+               
+               try {
+                       final List<MemorySegment> memSegs = new 
ArrayList<MemorySegment>();
+                       for (int i = 0; i < NUM_SEGS; i++) {
+                               memSegs.add(new MemorySegment(new byte[32 * 
1024]));
+                       }
+                       
+                       final FileIOChannel.ID channelID = 
this.ioManager.createChannel();
+                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channelID);
+                       
+                       for (int i = 0; i < NUM_IOS; i++) {
+                               final MemorySegment memSeg = memSegs.isEmpty() 
? writer.getNextReturnedSegment() : memSegs.remove(memSegs.size() - 1);
+                               
+                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
+                                       memSeg.putInt(pos, i);
+                               }
+                               
+                               writer.writeBlock(memSeg);
+                       }
+                       writer.close();
+                       
+                       // get back the memory
+                       while (memSegs.size() < NUM_SEGS) {
+                               memSegs.add(writer.getNextReturnedSegment());
+                       }
+                       
+                       final BlockChannelReader reader = 
this.ioManager.createBlockChannelReader(channelID);
+                       while(!memSegs.isEmpty()) {
+                               reader.readBlock(memSegs.remove(0));
+                       }
+                       
+                       for (int i = 0; i < NUM_IOS; i++) {
+                               final MemorySegment memSeg = 
reader.getNextReturnedSegment();
+                               
+                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
+                                       if (memSeg.getInt(pos) != i) {
+                                               fail("Read memory segment 
contains invalid data.");
+                                       }
+                               }
+                               reader.readBlock(memSeg);
+                       }
+                       
+                       reader.closeAndDelete();
+                       
+                       // get back the memory
+                       while (memSegs.size() < NUM_SEGS) {
+                               memSegs.add(reader.getNextReturnedSegment());
+                       }
+               }
+               catch (Exception ex) {
+                       ex.printStackTrace();
+                       fail("TEst encountered an exception: " + 
ex.getMessage());
+               }
+       }
+       
+       @Test
+       public void testExceptionPropagationReader() {
+               try {
+                       // use atomic boolean as a boolean reference
+                       final AtomicBoolean handlerCalled = new AtomicBoolean();
+                       final AtomicBoolean exceptionForwarded = new 
AtomicBoolean();
+                       
+                       ReadRequest req = new ReadRequest() {
+                               
+                               @Override
+                               public void requestDone(IOException ioex) {
+                                       if (ioex instanceof TestIOException) {
+                                               exceptionForwarded.set(true);
+                                       }
+                                       
+                                       synchronized (handlerCalled) {
+                                               handlerCalled.set(true);
+                                               handlerCalled.notifyAll();
+                                       }
+                               }
+                               
+                               @Override
+                               public void read() throws IOException {
+                                       throw new TestIOException();
+                               }
+                       };
+                       
+                       
+                       // test the read queue
+                       RequestQueue<ReadRequest> rq = 
ioManager.getReadRequestQueue(ioManager.createChannel());
+                       rq.add(req);
+
+                       // wait until the asynchronous request has been handled
+                       synchronized (handlerCalled) {
+                               while (!handlerCalled.get()) {
+                                       handlerCalled.wait();
+                               }
+                       }
+                       
+                       assertTrue(exceptionForwarded.get());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testExceptionPropagationWriter() {
+               try {
+                       // use atomic boolean as a boolean reference
+                       final AtomicBoolean handlerCalled = new AtomicBoolean();
+                       final AtomicBoolean exceptionForwarded = new 
AtomicBoolean();
+                       
+                       WriteRequest req = new WriteRequest() {
+                               
+                               @Override
+                               public void requestDone(IOException ioex) {
+                                       if (ioex instanceof TestIOException) {
+                                               exceptionForwarded.set(true);
+                                       }
+                                       
+                                       synchronized (handlerCalled) {
+                                               handlerCalled.set(true);
+                                               handlerCalled.notifyAll();
+                                       }
+                               }
+                               
+                               @Override
+                               public void write() throws IOException {
+                                       throw new TestIOException();
+                               }
+                       };
+                       
+                       
+                       // test the read queue
+                       RequestQueue<WriteRequest> rq = 
ioManager.getWriteRequestQueue(ioManager.createChannel());
+                       rq.add(req);
+
+                       // wait until the asynchronous request has been handled
+                       synchronized (handlerCalled) {
+                               while (!handlerCalled.get()) {
+                                       handlerCalled.wait();
+                               }
+                       }
+                       
+                       assertTrue(exceptionForwarded.get());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testExceptionInCallbackRead() {
+               try {
+                       final AtomicBoolean handlerCalled = new AtomicBoolean();
+                       
+                       ReadRequest regularRequest = new ReadRequest() {
+                               
+                               @Override
+                               public void requestDone(IOException ioex) {
+                                       synchronized (handlerCalled) {
+                                               handlerCalled.set(true);
+                                               handlerCalled.notifyAll();
+                                       }
+                               }
+                               
+                               @Override
+                               public void read() {}
+                       };
+                       
+                       ReadRequest exceptionThrower = new ReadRequest() {
+                               
+                               @Override
+                               public void requestDone(IOException ioex) {
+                                       throw new RuntimeException();
+                               }
+                               
+                               @Override
+                               public void read() {}
+                       };
+                       
+                       RequestQueue<ReadRequest> rq = 
ioManager.getReadRequestQueue(ioManager.createChannel());
+                       
+                       // queue first an exception thrower, then a regular 
request.
+                       // we check that the regular request gets successfully 
handled
+                       rq.add(exceptionThrower);
+                       rq.add(regularRequest);
+                       
+                       synchronized (handlerCalled) {
+                               while (!handlerCalled.get()) {
+                                       handlerCalled.wait();
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testExceptionInCallbackWrite() {
+               try {
+                       final AtomicBoolean handlerCalled = new AtomicBoolean();
+                       
+                       WriteRequest regularRequest = new WriteRequest() {
+                               
+                               @Override
+                               public void requestDone(IOException ioex) {
+                                       synchronized (handlerCalled) {
+                                               handlerCalled.set(true);
+                                               handlerCalled.notifyAll();
+                                       }
+                               }
+                               
+                               @Override
+                               public void write() {}
+                       };
+                       
+                       WriteRequest exceptionThrower = new WriteRequest() {
+                               
+                               @Override
+                               public void requestDone(IOException ioex) {
+                                       throw new RuntimeException();
+                               }
+                               
+                               @Override
+                               public void write() {}
+                       };
+                       
+                       RequestQueue<WriteRequest> rq = 
ioManager.getWriteRequestQueue(ioManager.createChannel());
+                       
+                       // queue first an exception thrower, then a regular 
request.
+                       // we check that the regular request gets successfully 
handled
+                       rq.add(exceptionThrower);
+                       rq.add(regularRequest);
+                       
+                       synchronized (handlerCalled) {
+                               while (!handlerCalled.get()) {
+                                       handlerCalled.wait();
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       
+       
+       final class TestIOException extends IOException {
+               private static final long serialVersionUID = 
-814705441998024472L;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 78951d3..f1d5337 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -25,9 +25,9 @@ import java.util.List;
 import java.util.Random;
 
 import org.junit.Assert;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -36,22 +36,17 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 
 /**
  * Integration test case for the I/O manager.
  */
 public class IOManagerITCase {
        
-       private static final Logger LOG = 
LoggerFactory.getLogger(IOManagerITCase.class);
-       
        private static final long SEED = 649180756312423613L;
 
-       private static final int NUMBER_OF_SEGMENTS = 10; // 10
-
-       private static final int SEGMENT_SIZE = 1024 * 1024; // 1M
+       private static final int MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL = 10;
+       
+       private static final int MEMORY_SIZE = 10 * 1024 * 1024; // 10 MB
        
        private final int NUM_CHANNELS = 29;
        
@@ -63,7 +58,7 @@ public class IOManagerITCase {
 
        @Before
        public void beforeTest() {
-               memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * 
SEGMENT_SIZE, 1);
+               memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
                ioManager = new IOManagerAsync();
        }
 
@@ -84,10 +79,7 @@ public class IOManagerITCase {
         * parallel. It is designed to check the ability of the IO manager to 
correctly handle multiple threads.
         */
        @Test
-       public void parallelChannelsTest() throws Exception
-       {
-               LOG.info("Starting parallel channels test.");
-               
+       public void parallelChannelsTest() throws Exception {
                final Random rnd = new Random(SEED);
                final AbstractInvokable memOwner = new 
DefaultMemoryManagerTest.DummyInvokable();
                
@@ -106,7 +98,7 @@ public class IOManagerITCase {
                        ids[i] = this.ioManager.createChannel();
                        writers[i] = 
this.ioManager.createBlockChannelWriter(ids[i]);
                        
-                       List<MemorySegment> memSegs = 
this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) 
+ 2);
+                       List<MemorySegment> memSegs = 
this.memoryManager.allocatePages(memOwner, 
rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1);
                        outs[i] = new ChannelWriterOutputView(writers[i], 
memSegs, this.memoryManager.getPageSize());
                }
                
@@ -114,24 +106,13 @@ public class IOManagerITCase {
                Value val = new Value();
                
                // write a lot of values unevenly distributed over the channels
-               int nextLogCount = 0;
-               float nextLogFraction = 0.0f;
                
-               LOG.info("Writing to channels...");
                for (int i = 0; i < NUMBERS_TO_BE_WRITTEN; i++) {
-                       
-                       if (i == nextLogCount) {
-                               LOG.info("... " + (int) (nextLogFraction * 100) 
+ "% done.");
-                               nextLogFraction += 0.05;
-                               nextLogCount = (int) (nextLogFraction * 
NUMBERS_TO_BE_WRITTEN);
-                       }
-                       
                        int channel = skewedSample(rnd, NUM_CHANNELS - 1);
                        
                        val.value = String.valueOf(writingCounters[channel]++);
                        val.write(outs[channel]);
                }
-               LOG.info("Writing done, flushing contents...");
                
                // close all writers
                for (int i = 0; i < NUM_CHANNELS; i++) {
@@ -141,12 +122,9 @@ public class IOManagerITCase {
                writers = null;
                
                // instantiate the readers for sequential read
-               LOG.info("Reading channels sequentially...");
-               for (int i = 0; i < NUM_CHANNELS; i++)
-               {
-                       List<MemorySegment> memSegs = 
this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) 
+ 2);
+               for (int i = 0; i < NUM_CHANNELS; i++) {
                        
-                       LOG.info("Reading channel " + (i+1) + "/" + 
NUM_CHANNELS + '.');
+                       List<MemorySegment> memSegs = 
this.memoryManager.allocatePages(memOwner, 
rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1);
                                
                        final BlockChannelReader reader = 
this.ioManager.createBlockChannelReader(ids[i]);
                        final ChannelReaderInputView in = new 
ChannelReaderInputView(reader, memSegs, false);
@@ -173,30 +151,19 @@ public class IOManagerITCase {
                        
                        this.memoryManager.release(in.close());
                }
-               LOG.info("Sequential reading done.");
                
                // instantiate the readers
-               LOG.info("Reading channels randomly...");
                for (int i = 0; i < NUM_CHANNELS; i++) {
                        
-                       List<MemorySegment> memSegs = 
this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) 
+ 2);
+                       List<MemorySegment> memSegs = 
this.memoryManager.allocatePages(memOwner, 
rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1);
                                
                        readers[i] = 
this.ioManager.createBlockChannelReader(ids[i]);
                        ins[i] = new ChannelReaderInputView(readers[i], 
memSegs, false);
                }
                
-               nextLogCount = 0;
-               nextLogFraction = 0.0f;
-               
                // read a lot of values in a mixed order from the channels
                for (int i = 0; i < NUMBERS_TO_BE_WRITTEN; i++) {
                        
-                       if (i == nextLogCount) {
-                               LOG.info("... " + (int) (nextLogFraction * 100) 
+ "% done.");
-                               nextLogFraction += 0.05;
-                               nextLogCount = (int) (nextLogFraction * 
NUMBERS_TO_BE_WRITTEN);
-                       }
-                       
                        while (true) {
                                final int channel = skewedSample(rnd, 
NUM_CHANNELS - 1);
                                if (ins[channel] != null) {
@@ -222,7 +189,6 @@ public class IOManagerITCase {
                        }
                        
                }
-               LOG.info("Random reading done.");
                
                // close all readers
                for (int i = 0; i < NUM_CHANNELS; i++) {
@@ -256,10 +222,9 @@ public class IOManagerITCase {
        
        protected static class Value implements IOReadableWritable {
 
-               String value;
+               private String value;
 
-               public Value() {
-               }
+               public Value() {}
 
                public Value(String val) {
                        this.value = val;
@@ -306,5 +271,4 @@ public class IOManagerITCase {
                        return true;
                }
        }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index fa6cb80..ab5c206 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -16,233 +16,96 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest.DummyInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 
-public class IOManagerTest {
-       
-       // 
------------------------------------------------------------------------
-       //                        Cross Test Fields
-       // 
------------------------------------------------------------------------
-       
-       private IOManager ioManager;
-
-       private DefaultMemoryManager memoryManager;
-       
-       // 
------------------------------------------------------------------------
-       //                           Setup & Shutdown
-       // 
------------------------------------------------------------------------
-       
-       @Before
-       public void beforeTest() {
-               this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024, 
1);
-               this.ioManager = new IOManagerAsync();
-       }
-
-       @After
-       public void afterTest() {
-               this.ioManager.shutdown();
-               Assert.assertTrue("IO Manager has not properly shut down.", 
ioManager.isProperlyShutDown());
-               
-               Assert.assertTrue("Not all memory was returned to the memory 
manager in the test.", this.memoryManager.verifyEmpty());
-               this.memoryManager.shutdown();
-               this.memoryManager = null;
-       }
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.junit.Test;
 
-       // 
------------------------------------------------------------------------
-       //                           Test Methods
-       // 
------------------------------------------------------------------------
-       
-       // 
------------------------------------------------------------------------
+public class IOManagerTest {
 
-       /**
-        * Tests that the channel enumerator creates channels in the temporary 
files directory.
-        */
        @Test
        public void channelEnumerator() {
-               File tempPath = new File(System.getProperty("java.io.tmpdir")); 
+               File tempPath = new File(System.getProperty("java.io.tmpdir"));
                
-               FileIOChannel.Enumerator enumerator = 
ioManager.createChannelEnumerator();
+               String[] tempDirs = new String[] {
+                       new File(tempPath, "a").getAbsolutePath(),
+                       new File(tempPath, "b").getAbsolutePath(),
+                       new File(tempPath, "c").getAbsolutePath(),
+                       new File(tempPath, "d").getAbsolutePath(),
+                       new File(tempPath, "e").getAbsolutePath(),
+               };
+               
+               int[] counters = new int[tempDirs.length];
+               
+               
+               FileIOChannel.Enumerator enumerator = new 
TestIOManager(tempDirs).createChannelEnumerator();
 
-               for (int i = 0; i < 10; i++) {
+               for (int i = 0; i < 3 * tempDirs.length; i++) {
                        FileIOChannel.ID id = enumerator.next();
                        
                        File path = new File(id.getPath());
-                       Assert.assertTrue("Channel IDs must name an absolute 
path.", path.isAbsolute());
-                       Assert.assertFalse("Channel IDs must name a file, not a 
directory.", path.isDirectory());
-                       Assert.assertTrue("Path is not in the temp directory.", 
tempPath.equals(path.getParentFile()));
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Test
-       public void channelReadWriteOneSegment() {
-               final int NUM_IOS = 1111;
-               
-               try {
-                       final FileIOChannel.ID channelID = 
this.ioManager.createChannel();
-                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channelID);
                        
-                       MemorySegment memSeg = 
this.memoryManager.allocatePages(new DummyInvokable(), 1).get(0);
+                       assertTrue("Channel IDs must name an absolute path.", 
path.isAbsolute());
                        
-                       for (int i = 0; i < NUM_IOS; i++) {
-                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
-                                       memSeg.putInt(pos, i);
-                               }
-                               
-                               writer.writeBlock(memSeg);
-                               memSeg = writer.getNextReturnedSegment();
-                       }
+                       assertFalse("Channel IDs must name a file, not a 
directory.", path.isDirectory());
                        
-                       writer.close();
+                       assertTrue("Path is not in the temp directory.", 
tempPath.equals(path.getParentFile().getParentFile()));
                        
-                       final BlockChannelReader reader = 
this.ioManager.createBlockChannelReader(channelID);
-                       for (int i = 0; i < NUM_IOS; i++) {
-                               reader.readBlock(memSeg);
-                               memSeg = reader.getNextReturnedSegment();
-                               
-                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
-                                       if (memSeg.getInt(pos) != i) {
-                                               Assert.fail("Read memory 
segment contains invalid data.");
-                                       }
+                       for (int k = 0; k < tempDirs.length; k++) {
+                               if (path.getParent().equals(tempDirs[k])) {
+                                       counters[k]++;
                                }
                        }
-                       
-                       reader.closeAndDelete();
-                       
-                       this.memoryManager.release(memSeg);
-                       
-               } catch (Exception ex) {
-                       ex.printStackTrace();
-                       Assert.fail("TEst encountered an exception: " + 
ex.getMessage());
                }
-       }
-       
-       @Test
-       public void channelReadWriteMultipleSegments() {
-               final int NUM_IOS = 1111;
-               final int NUM_SEGS = 16;
                
-               try {
-                       final List<MemorySegment> memSegs = 
this.memoryManager.allocatePages(new DummyInvokable(), NUM_SEGS);
-                       final FileIOChannel.ID channelID = 
this.ioManager.createChannel();
-                       final BlockChannelWriter writer = 
this.ioManager.createBlockChannelWriter(channelID);
-                       
-                       for (int i = 0; i < NUM_IOS; i++) {
-                               final MemorySegment memSeg = memSegs.isEmpty() 
? writer.getNextReturnedSegment() : memSegs.remove(0);
-                               
-                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
-                                       memSeg.putInt(pos, i);
-                               }
-                               
-                               writer.writeBlock(memSeg);
-                       }
-                       writer.close();
-                       
-                       // get back the memory
-                       while (memSegs.size() < NUM_SEGS) {
-                               memSegs.add(writer.getNextReturnedSegment());
-                       }
-                       
-                       final BlockChannelReader reader = 
this.ioManager.createBlockChannelReader(channelID);
-                       while(!memSegs.isEmpty()) {
-                               reader.readBlock(memSegs.remove(0));
-                       }
-                       
-                       for (int i = 0; i < NUM_IOS; i++) {
-                               final MemorySegment memSeg = 
reader.getNextReturnedSegment();
-                               
-                               for (int pos = 0; pos < memSeg.size(); pos += 
4) {
-                                       if (memSeg.getInt(pos) != i) {
-                                               Assert.fail("Read memory 
segment contains invalid data.");
-                                       }
-                               }
-                               reader.readBlock(memSeg);
-                       }
-                       
-                       reader.closeAndDelete();
-                       
-                       // get back the memory
-                       while (memSegs.size() < NUM_SEGS) {
-                               memSegs.add(reader.getNextReturnedSegment());
-                       }
-                       
-                       this.memoryManager.release(memSegs);
-                       
-               } catch (Exception ex) {
-                       ex.printStackTrace();
-                       Assert.fail("TEst encountered an exception: " + 
ex.getMessage());
+               for (int k = 0; k < tempDirs.length; k++) {
+                       assertEquals(3, counters[k]);
                }
        }
-
-       // 
============================================================================================
        
-       final class FailingSegmentReadRequest implements ReadRequest {
-               
-               private final AsynchronousFileIOChannel<MemorySegment, 
ReadRequest> channel;
-               
-               private final MemorySegment segment;
-               
-               protected 
FailingSegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> 
targetChannel, MemorySegment segment) {
-                       this.channel = targetChannel;
-                       this.segment = segment;
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static class TestIOManager extends IOManager {
+
+               protected TestIOManager(String[] paths) {
+                       super(paths);
                }
 
+               @Override
+               public void shutdown() {}
 
                @Override
-               public void read() throws IOException {
-                       throw new TestIOException();
+               public boolean isProperlyShutDown() {
+                       return false;
                }
 
-
                @Override
-               public void requestDone(IOException ioex) {
-                       this.channel.handleProcessedBuffer(this.segment, ioex);
+               public BlockChannelWriter createBlockChannelWriter(ID 
channelID, LinkedBlockingQueue<MemorySegment> returnQueue) {
+                       throw new UnsupportedOperationException();
                }
-       }
 
-       
//--------------------------------------------------------------------------------------------
-
-       /**
-        * Special write request that writes an entire memory segment to the 
block writer.
-        */
-       final class FailingSegmentWriteRequest implements WriteRequest {
-               
-               private final AsynchronousFileIOChannel<MemorySegment, 
WriteRequest> channel;
-               
-               private final MemorySegment segment;
-               
-               protected 
FailingSegmentWriteRequest(AsynchronousFileIOChannel<MemorySegment, 
WriteRequest> targetChannel, MemorySegment segment) {
-                       this.channel = targetChannel;
-                       this.segment = segment;
+               @Override
+               public BlockChannelWriterWithCallback 
createBlockChannelWriter(ID channelID, RequestDoneCallback callback) {
+                       throw new UnsupportedOperationException();
                }
 
                @Override
-               public void write() throws IOException {
-                       throw new TestIOException();
+               public BlockChannelReader createBlockChannelReader(ID 
channelID, LinkedBlockingQueue<MemorySegment> returnQueue) {
+                       throw new UnsupportedOperationException();
                }
 
                @Override
-               public void requestDone(IOException ioex) {
-                       this.channel.handleProcessedBuffer(this.segment, ioex);
+               public BulkBlockChannelReader createBulkBlockChannelReader(ID 
channelID, List<MemorySegment> targetSegments, int numBlocks) {
+                       throw new UnsupportedOperationException();
                }
        }
-       
-       
-       final class TestIOException extends IOException {
-               private static final long serialVersionUID = 
-814705441998024472L;
-       }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java
new file mode 100644
index 0000000..951a661
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+
+public final class PairGenerator {
+
+       public static class Pair implements Value {
+               
+               private static final long serialVersionUID = 1L;
+               
+               private int key;
+               private StringValue value = new StringValue();
+               
+               
+               public Pair() {}
+               
+               
+               public int getKey() {
+                       return key;
+               }
+               
+               public StringValue getValue() {
+                       return value;
+               }
+               
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       out.writeInt(key);
+                       value.write(out);
+               }
+               
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       key = in.readInt();
+                       value.read(in);
+               }
+               
+               @Override
+               public int hashCode() {
+                       return 31 * key + value.hashCode();
+               }
+               
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof Pair) {
+                               Pair other = (Pair) obj;
+                               return other.key == this.key && 
other.value.equals(this.value);
+                       } else {
+                               return false;
+                       }
+               }
+               
+               @Override
+               public String toString() {
+                       return String.format("(%d, %s)", key, value);
+               }
+       }
+       
+       public enum KeyMode {
+               SORTED, RANDOM
+       };
+
+       public enum ValueMode {
+               FIX_LENGTH, RANDOM_LENGTH, CONSTANT
+       };
+
+       private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 
'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
+               'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
+
+       private final long seed;
+
+       private final int keyMax;
+
+       private final int valueLength;
+
+       private final KeyMode keyMode;
+
+       private final ValueMode valueMode;
+
+       private Random random;
+
+       private int counter;
+
+       private final StringValue valueConstant;
+
+       
+       public PairGenerator(long seed, int keyMax, int valueLength) {
+               this(seed, keyMax, valueLength, KeyMode.RANDOM, 
ValueMode.FIX_LENGTH);
+       }
+
+       public PairGenerator(long seed, int keyMax, int valueLength, KeyMode 
keyMode, ValueMode valueMode) {
+               this(seed, keyMax, valueLength, keyMode, valueMode, null);
+       }
+       
+       public PairGenerator(long seed, int keyMax, int valueLength, KeyMode 
keyMode, ValueMode valueMode, String constant) {
+               this.seed = seed;
+               this.keyMax = keyMax;
+               this.valueLength = valueLength;
+               this.keyMode = keyMode;
+               this.valueMode = valueMode;
+
+               this.random = new Random(seed);
+               this.counter = 0;
+               
+               this.valueConstant = new StringValue();
+               if (constant != null) {
+                       this.valueConstant.setValue(constant);
+               }
+       }
+
+       public void next(Pair target) {
+               target.key = (keyMode == KeyMode.SORTED ? ++counter : 
Math.abs(random.nextInt() % keyMax) + 1);
+               
+               if (valueMode == ValueMode.CONSTANT) {
+                       target.value = valueConstant;
+               } else {
+                       randomString(target.value);
+               }
+       }
+
+       public void reset() {
+               this.random = new Random(seed);
+               this.counter = 0;
+       }
+
+       private void randomString(StringValue target) {
+               
+               int length = valueMode == ValueMode.FIX_LENGTH ?
+                       valueLength :
+                       valueLength - random.nextInt(valueLength / 3);
+
+               target.setLength(0);
+               for (int i = 0; i < length; i++) {
+                       target.append(alpha[random.nextInt(alpha.length)]);
+               }
+       }
+}

Reply via email to