http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 1f79067..7de8651 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A version of the {@link IOManager} that uses asynchronous I/O.
@@ -181,13 +181,13 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
        public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID 
channelID,
                                                                
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
        {
-               Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+               checkState(!shutdown, "I/O-Manger is closed.");
                return new AsynchronousBlockWriter(channelID, 
this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
        }
        
        @Override
-       public BlockChannelWriterWithCallback 
createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback 
callback) throws IOException {
-               Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+       public BlockChannelWriterWithCallback 
createBlockChannelWriter(FileIOChannel.ID channelID, 
RequestDoneCallback<MemorySegment> callback) throws IOException {
+               checkState(!shutdown, "I/O-Manger is closed.");
                return new AsynchronousBlockWriterWithCallback(channelID, 
this.writers[channelID.getThreadNum()].requestQueue, callback);
        }
        
@@ -205,7 +205,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
        public BlockChannelReader createBlockChannelReader(FileIOChannel.ID 
channelID,
                                                                                
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
        {
-               Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+               checkState(!shutdown, "I/O-Manger is closed.");
                return new AsynchronousBlockReader(channelID, 
this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
        }
        
@@ -228,7 +228,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
        public BulkBlockChannelReader 
createBulkBlockChannelReader(FileIOChannel.ID channelID,
                        List<MemorySegment> targetSegments,     int numBlocks) 
throws IOException
        {
-               Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+               checkState(!shutdown, "I/O-Manger is closed.");
                return new AsynchronousBulkBlockReader(channelID, 
this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
index 1b7adae..69791f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
@@ -33,7 +33,6 @@ interface IORequest {
        public void requestDone(IOException ioex);
 }
 
-
 /**
  * Interface for I/O requests that are handled by the IOManager's reading 
thread. 
  */
@@ -47,7 +46,6 @@ interface ReadRequest extends IORequest {
        public void read() throws IOException;
 }
 
-
 /**
  * Interface for I/O requests that are handled by the IOManager's writing 
thread.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
index 78699e2..95f3dc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * A {@link RequestDoneCallback} that adds the memory segments to a blocking 
queue.
  */
-public class QueuingCallback implements RequestDoneCallback {
+public class QueuingCallback implements RequestDoneCallback<MemorySegment> {
 
        private final LinkedBlockingQueue<MemorySegment> queue;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
index 982343c..f9a0965 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
@@ -20,17 +20,15 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
 
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * Callback to be executed on completion of an asynchronous I/O request.
- * Depending on success or failure, either method of
- * {@ink #requestSuccessful(MemorySegment)} or {@link 
#requestFailed(MemorySegment, IOException)}
- * is called.
+ * <p>
+ * Depending on success or failure, either {@link #requestSuccessful(Object)}
+ * or {@link #requestSuccessful(Object)} is called.
  */
-public interface RequestDoneCallback {
+public interface RequestDoneCallback<T> {
+
+       void requestSuccessful(T request);
 
-       void requestSuccessful(MemorySegment buffer);
-       
-       void requestFailed(MemorySegment buffer, IOException e);
+       void requestFailed(T buffer, IOException e);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
deleted file mode 100644
index 60232da..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-public class Buffer {
-
-       private final MemorySegment memorySegment;
-
-       private final BufferRecycler recycler;
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       private final AtomicInteger referenceCounter;
-
-       private int size;
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       public Buffer(MemorySegment memorySegment, int size, BufferRecycler 
recycler) {
-               this.memorySegment = memorySegment;
-               this.size = size;
-               this.recycler = recycler;
-
-               // we are the first, so we start with reference count of one
-               this.referenceCounter = new AtomicInteger(1);
-       }
-
-       /**
-        * @param toDuplicate Buffer instance to duplicate
-        */
-       private Buffer(Buffer toDuplicate) {
-               if (toDuplicate.referenceCounter.getAndIncrement() == 0) {
-                       throw new IllegalStateException("Buffer was released 
before duplication.");
-               }
-               
-               this.memorySegment = toDuplicate.memorySegment;
-               this.size = toDuplicate.size;
-               this.recycler = toDuplicate.recycler;
-               this.referenceCounter = toDuplicate.referenceCounter;
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       public MemorySegment getMemorySegment() {
-               return this.memorySegment;
-       }
-
-       public int size() {
-               return this.size;
-       }
-
-       public void limitSize(int size) {
-               if (size >= 0 && size <= this.memorySegment.size()) {
-                       this.size = size;
-               } else {
-                       throw new IllegalArgumentException();
-               }
-       }
-
-       public void recycleBuffer() {
-               int refCount = this.referenceCounter.decrementAndGet();
-               if (refCount == 0) {
-                       this.recycler.recycle(this.memorySegment);
-               }
-       }
-
-       public Buffer duplicate() {
-               return new Buffer(this);
-       }
-
-       public void copyToBuffer(Buffer destinationBuffer) {
-               if (size() > destinationBuffer.size()) {
-                       throw new IllegalArgumentException("Destination buffer 
is too small to store content of source buffer.");
-               }
-
-               this.memorySegment.copyTo(0, destinationBuffer.memorySegment, 
0, size);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
deleted file mode 100644
index e48ca52..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-public interface BufferRecycler {
-
-       /**
-        * Called by {@link org.apache.flink.runtime.io.network.Buffer} to 
return a {@link MemorySegment} to its original buffer pool.
-        *
-        * @param buffer the segment to be recycled
-        */
-       void recycle(MemorySegment buffer);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
deleted file mode 100644
index c841872..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import akka.actor.ActorRef;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
-import org.apache.flink.runtime.io.network.bufferprovider.DiscardBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import org.apache.flink.runtime.io.network.channels.Channel;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.channels.InputChannel;
-import org.apache.flink.runtime.io.network.channels.OutputChannel;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.ExceptionUtils;
-import scala.concurrent.duration.FiniteDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The channel manager sets up the network buffers and dispatches data between 
channels.
- */
-public class ChannelManager implements EnvelopeDispatcher, 
BufferProviderBroker {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ChannelManager.class);
-
-       private final ActorRef channelLookup;
-
-       private final InstanceConnectionInfo connectionInfo;
-
-       private final Map<ChannelID, Channel> channels;
-
-       private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools;
-
-       private final Map<ChannelID, EnvelopeReceiverList> receiverCache;
-
-       private final GlobalBufferPool globalBufferPool;
-
-       private final NetworkConnectionManager networkConnectionManager;
-       
-       private final InetSocketAddress ourAddress;
-       
-       private final DiscardBufferPool discardBufferPool;
-
-       private final FiniteDuration timeout;
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo 
connectionInfo, int numNetworkBuffers,
-                                               int networkBufferSize, 
NetworkConnectionManager networkConnectionManager,
-                                               FiniteDuration timeout) throws 
IOException {
-
-               this.channelLookup= channelLookup;
-               this.connectionInfo = connectionInfo;
-
-               this.timeout = timeout;
-
-               try {
-                       this.globalBufferPool = new 
GlobalBufferPool(numNetworkBuffers, networkBufferSize);
-               } catch (Throwable e) {
-                       throw new IOException("Failed to instantiate 
GlobalBufferPool.", e);
-               }
-
-               this.networkConnectionManager = networkConnectionManager;
-               networkConnectionManager.start(this);
-
-               // management data structures
-               this.channels = new ConcurrentHashMap<ChannelID, Channel>();
-               this.receiverCache = new ConcurrentHashMap<ChannelID, 
EnvelopeReceiverList>();
-               this.localBuffersPools = new ConcurrentHashMap<AbstractID, 
LocalBufferPoolOwner>();
-               
-               this.ourAddress = new 
InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
-               
-               // a special pool if the data is to be discarded
-               this.discardBufferPool = new DiscardBufferPool();
-       }
-
-       public void shutdown() throws IOException {
-               this.networkConnectionManager.shutdown();
-
-               this.globalBufferPool.destroy();
-       }
-
-       public GlobalBufferPool getGlobalBufferPool() {
-               return globalBufferPool;
-       }
-
-       public NetworkConnectionManager getNetworkConnectionManager() {
-               return networkConnectionManager;
-       }
-       
-       // 
-----------------------------------------------------------------------------------------------------------------
-       //                                               Task registration
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       /**
-        * Registers the given task with the channel manager.
-        *
-        * @param task the task to be registered
-        * @throws InsufficientResourcesException thrown if not enough buffers 
available to safely run this task
-        */
-       public void register(Task task) throws InsufficientResourcesException {
-               // Check if we can safely run this task with the given buffers
-               ensureBufferAvailability(task);
-
-               RuntimeEnvironment environment = task.getEnvironment();
-
-               // 
-------------------------------------------------------------------------------------------------------------
-               //                                       Register output 
channels
-               // 
-------------------------------------------------------------------------------------------------------------
-
-               environment.registerGlobalBufferPool(this.globalBufferPool);
-
-               if (this.localBuffersPools.containsKey(task.getExecutionId())) {
-                       throw new IllegalStateException("Execution " + 
task.getExecutionId() + " has a previous buffer pool owner");
-               }
-
-               for (OutputGate gate : environment.outputGates()) {
-                       // add receiver list hints
-                       for (OutputChannel channel : gate.channels()) {
-                               // register envelope dispatcher with the channel
-                               channel.registerEnvelopeDispatcher(this);
-
-                               switch (channel.getChannelType()) {
-                                       case IN_MEMORY:
-                                               
addReceiverListHint(channel.getID(), channel.getConnectedId());
-                                               break;
-                                       case NETWORK:
-                                               
addReceiverListHint(channel.getConnectedId(), channel.getID());
-                                               break;
-                               }
-
-                               this.channels.put(channel.getID(), channel);
-                       }
-               }
-
-               this.localBuffersPools.put(task.getExecutionId(), environment);
-
-               // 
-------------------------------------------------------------------------------------------------------------
-               //                                       Register input channels
-               // 
-------------------------------------------------------------------------------------------------------------
-
-               // register global
-               for (InputGate<?> gate : environment.inputGates()) {
-                       gate.registerGlobalBufferPool(this.globalBufferPool);
-
-                       for (int i = 0; i < gate.getNumberOfInputChannels(); 
i++) {
-                               InputChannel<? extends IOReadableWritable> 
channel = gate.getInputChannel(i);
-                               channel.registerEnvelopeDispatcher(this);
-
-                               if (channel.getChannelType() == 
ChannelType.IN_MEMORY) {
-                                       addReceiverListHint(channel.getID(), 
channel.getConnectedId());
-                               }
-
-                               this.channels.put(channel.getID(), channel);
-                       }
-
-                       this.localBuffersPools.put(gate.getGateID(), gate);
-               }
-
-               // the number of channels per buffers has changed after 
unregistering the task
-               // => redistribute the number of designated buffers of the 
registered local buffer pools
-               redistributeBuffers();
-       }
-
-       /**
-        * Unregisters the given task from the channel manager.
-        *
-        * @param executionId the ID of the task to be unregistered
-        * @param task the task to be unregistered
-        */
-       public void unregister(ExecutionAttemptID executionId, Task task) {
-               final Environment environment = task.getEnvironment();
-               if (environment == null) {
-                       return;
-               }
-
-               // destroy and remove OUTPUT channels from registered channels 
and cache
-               for (ChannelID id : environment.getOutputChannelIDs()) {
-                       Channel channel = this.channels.remove(id);
-                       if (channel != null) {
-
-                               channel.destroy();
-
-                               
removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
-                       }
-               }
-
-               // destroy and remove INPUT channels from registered channels 
and cache
-               for (ChannelID id : environment.getInputChannelIDs()) {
-                       Channel channel = this.channels.remove(id);
-                       if (channel != null) {
-                               channel.destroy();
-
-                               
removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
-                       }
-               }
-
-               // clear and remove INPUT side buffer pools
-               for (GateID id : environment.getInputGateIDs()) {
-                       LocalBufferPoolOwner bufferPool = 
this.localBuffersPools.remove(id);
-                       if (bufferPool != null) {
-                               bufferPool.clearLocalBufferPool();
-                       }
-               }
-
-               // clear and remove OUTPUT side buffer pool
-               LocalBufferPoolOwner bufferPool = 
this.localBuffersPools.remove(executionId);
-               if (bufferPool != null) {
-                       bufferPool.clearLocalBufferPool();
-               }
-
-               // the number of channels per buffers has changed after 
unregistering the task
-               // => redistribute the number of designated buffers of the 
registered local buffer pools
-               redistributeBuffers();
-       }
-
-       private void removeFromReceiverCacheAndMaybeCloseTcpConnection(Channel 
channel) {
-               EnvelopeReceiverList receiver = 
this.receiverCache.remove(channel.getID());
-
-               if (receiver != null && receiver.hasRemoteReceiver()) {
-                       
networkConnectionManager.close(receiver.getRemoteReceiver());
-               }
-       }
-
-       /**
-        * Ensures that the channel manager has enough buffers to execute the 
given task.
-        * <p>
-        * If there is less than one buffer per channel available, an 
InsufficientResourcesException will be thrown,
-        * because of possible deadlocks. With more then one buffer per 
channel, deadlock-freedom is guaranteed.
-        *
-        * @param task task to be executed
-        * @throws InsufficientResourcesException thrown if not enough buffers 
available to execute the task
-        */
-       private void ensureBufferAvailability(Task task) throws 
InsufficientResourcesException {
-               Environment env = task.getEnvironment();
-
-               int numBuffers = this.globalBufferPool.numBuffers();
-               // existing channels + channels of the task
-               int numChannels = this.channels.size() + 
env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
-
-               // need at least one buffer per channel
-               if (numChannels > 0 && numBuffers / numChannels < 1) {
-                       String msg = String.format("%s has not enough buffers 
to safely execute %s (%d buffers missing)",
-                                       this.connectionInfo.getFQDNHostname(), 
env.getTaskName(), numChannels - numBuffers);
-
-                       throw new InsufficientResourcesException(msg);
-               }
-       }
-
-       /**
-        * Redistributes the buffers among the registered buffer pools. This 
method is called after each task registration
-        * and unregistration.
-        * <p>
-        * Every registered buffer pool gets buffers according to its number of 
channels weighted by the current buffer to
-        * channel ratio.
-        */
-       private void redistributeBuffers() {
-               if (this.localBuffersPools.isEmpty() | this.channels.size() == 
0) {
-                       return;
-               }
-
-               int numBuffers = this.globalBufferPool.numBuffers();
-               int numChannels = this.channels.size();
-
-               double buffersPerChannel = numBuffers / (double) numChannels;
-
-               if (buffersPerChannel < 1.0) {
-                       throw new RuntimeException("System has not enough 
buffers to execute tasks.");
-               }
-
-               // redistribute number of designated buffers per buffer pool
-               for (LocalBufferPoolOwner bufferPool : 
this.localBuffersPools.values()) {
-                       int numDesignatedBuffers = (int) 
Math.ceil(buffersPerChannel * bufferPool.getNumberOfChannels());
-                       
bufferPool.setDesignatedNumberOfBuffers(numDesignatedBuffers);
-               }
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       //                                           Envelope processing
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       private void releaseEnvelope(Envelope envelope) {
-               Buffer buffer = envelope.getBuffer();
-               if (buffer != null) {
-                       buffer.recycleBuffer();
-               }
-       }
-
-       private void addReceiverListHint(ChannelID source, ChannelID 
localReceiver) {
-               EnvelopeReceiverList receiverList = new 
EnvelopeReceiverList(localReceiver);
-
-               if (this.receiverCache.put(source, receiverList) != null) {
-                       LOG.warn("Receiver cache already contained entry for " 
+ source);
-               }
-       }
-
-       private void addReceiverListHint(ChannelID source, RemoteReceiver 
remoteReceiver) {
-               EnvelopeReceiverList receiverList = new 
EnvelopeReceiverList(remoteReceiver);
-
-               if (this.receiverCache.put(source, receiverList) != null) {
-                       LOG.warn("Receiver cache already contained entry for " 
+ source);
-               }
-       }
-
-       private void generateSenderHint(Envelope envelope, RemoteReceiver 
receiver) throws IOException {
-               Channel channel = this.channels.get(envelope.getSource());
-               if (channel == null) {
-                       LOG.error("Cannot find channel for channel ID " + 
envelope.getSource());
-                       return;
-               }
-
-               // Only generate sender hints for output channels
-               if (channel.isInputChannel()) {
-                       return;
-               }
-
-               final ChannelID targetChannelID = channel.getConnectedId();
-               final int connectionIndex = receiver.getConnectionIndex();
-
-               final RemoteReceiver ourAddress = new 
RemoteReceiver(this.ourAddress, connectionIndex);
-               final Envelope senderHint = 
SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
-
-               this.networkConnectionManager.enqueue(senderHint, receiver, 
true);
-       }
-
-       /**
-        * Returns the list of receivers for transfer envelopes produced by the 
channel with the given source channel ID.
-        *
-        * @param jobID
-        *        the ID of the job the given channel ID belongs to
-        * @param sourceChannelID
-        *        the source channel ID for which the receiver list shall be 
retrieved
-        * @return the list of receivers or <code>null</code> if the receiver 
could not be determined
-        * @throws IOException
-        */
-       private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID 
sourceChannelID, boolean reportException) throws IOException {
-               EnvelopeReceiverList receiverList = 
this.receiverCache.get(sourceChannelID);
-
-               if (receiverList != null) {
-                       return receiverList;
-               }
-
-               while (true) {
-                       ConnectionInfoLookupResponse lookupResponse;
-                       lookupResponse = 
AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
-                                       new 
JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
-                                                       sourceChannelID), 
timeout).response();
-
-                       if (lookupResponse.receiverReady()) {
-                               receiverList = new 
EnvelopeReceiverList(lookupResponse);
-                               break;
-                       }
-                       else if (lookupResponse.receiverNotReady()) {
-                               try {
-                                       Thread.sleep(100);
-                               } catch (InterruptedException e) {
-                                       if (reportException) {
-                                               throw new IOException("Lookup 
was interrupted.");
-                                       } else {
-                                               return null;
-                                       }
-                               }
-                       }
-                       else if (lookupResponse.isJobAborting()) {
-                               if (reportException) {
-                                       throw new CancelTaskException();
-                               } else {
-                                       return null;
-                               }
-                       }
-                       else if (lookupResponse.receiverNotFound()) {
-                               if (reportException) {
-                                       throw new IOException("Could not find 
the receiver for Job " + jobID + ", channel with source id " + sourceChannelID);
-                               } else {
-                                       return null;
-                               }
-                       }
-                       else {
-                               throw new IllegalStateException("Unrecognized 
response to channel lookup.");
-                       }
-               }
-
-               this.receiverCache.put(sourceChannelID, receiverList);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(String.format("Receiver for %s: %s [%s])",
-                                       sourceChannelID,
-                                       receiverList.hasLocalReceiver() ? 
receiverList.getLocalReceiver() : receiverList.getRemoteReceiver(),
-                                       receiverList.hasLocalReceiver() ? 
"local" : "remote"));
-               }
-
-               return receiverList;
-       }
-
-       /**
-        * Invalidates the entries identified by the given channel IDs from the 
receiver lookup cache.
-        *
-        * @param channelIDs channel IDs for entries to invalidate
-        */
-       public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) {
-               for (ChannelID id : channelIDs) {
-                       this.receiverCache.remove(id);
-               }
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       //                                       EnvelopeDispatcher methods
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       @Override
-       public void dispatchFromOutputChannel(Envelope envelope) throws 
IOException, InterruptedException {
-               EnvelopeReceiverList receiverList = 
getReceiverListForEnvelope(envelope, true);
-
-               Buffer srcBuffer = envelope.getBuffer();
-               Buffer destBuffer = null;
-               
-               boolean success = false;
-               
-               try {
-                       if (receiverList.hasLocalReceiver()) {
-                               ChannelID receiver = 
receiverList.getLocalReceiver();
-                               Channel channel = this.channels.get(receiver);
-
-                               if (channel == null) {
-                                       throw new 
LocalReceiverCancelledException(receiver);
-                               }
-
-                               if (!channel.isInputChannel()) {
-                                       throw new IOException("Local receiver " 
+ receiver + " is not an input channel.");
-                               }
-
-                               InputChannel<?> inputChannel = 
(InputChannel<?>) channel;
-                               
-                               // copy the buffer into the memory space of the 
receiver 
-                               if (srcBuffer != null) {
-                                       try {
-                                               destBuffer = 
inputChannel.requestBufferBlocking(srcBuffer.size());
-                                       } catch (InterruptedException e) {
-                                               throw new 
IOException(e.getMessage());
-                                       }
-
-                                       srcBuffer.copyToBuffer(destBuffer);
-                                       envelope.setBuffer(destBuffer);
-                                       srcBuffer.recycleBuffer();
-                               }
-                               
-                               inputChannel.queueEnvelope(envelope);
-                               success = true;
-                       }
-                       else if (receiverList.hasRemoteReceiver()) {
-                               RemoteReceiver remoteReceiver = 
receiverList.getRemoteReceiver();
-
-                               // Generate sender hint before sending the 
first envelope over the network
-                               if (envelope.getSequenceNumber() == 0) {
-                                       generateSenderHint(envelope, 
remoteReceiver);
-                               }
-
-                               this.networkConnectionManager.enqueue(envelope, 
remoteReceiver, false);
-                               success = true;
-                       }
-               } finally {
-                       if (!success) {
-                               if (srcBuffer != null) {
-                                       srcBuffer.recycleBuffer();
-                               }
-                               if (destBuffer != null) {
-                                       destBuffer.recycleBuffer();
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void dispatchFromInputChannel(Envelope envelope) throws 
IOException, InterruptedException {
-               // this method sends only events back from input channels to 
output channels
-               // sanity check that we have no buffer
-               if (envelope.getBuffer() != null) {
-                       throw new RuntimeException("Error: This method can only 
process envelopes without buffers.");
-               }
-               
-               EnvelopeReceiverList receiverList = 
getReceiverListForEnvelope(envelope, true);
-
-               if (receiverList.hasLocalReceiver()) {
-                       ChannelID receiver = receiverList.getLocalReceiver();
-                       Channel channel = this.channels.get(receiver);
-
-                       if (channel == null) {
-                               throw new 
LocalReceiverCancelledException(receiver);
-                       }
-
-                       if (channel.isInputChannel()) {
-                               throw new IOException("Local receiver " + 
receiver + " of backward event is not an output channel.");
-                       }
-
-                       OutputChannel outputChannel = (OutputChannel) channel;
-                       outputChannel.queueEnvelope(envelope);
-               }
-               else if (receiverList.hasRemoteReceiver()) {
-                       RemoteReceiver remoteReceiver = 
receiverList.getRemoteReceiver();
-
-                       // Generate sender hint before sending the first 
envelope over the network
-                       this.networkConnectionManager.enqueue(envelope, 
remoteReceiver, envelope.getSequenceNumber() == 0);
-               }
-       }
-
-       /**
-        * 
-        */
-       @Override
-       public void dispatchFromNetwork(Envelope envelope) throws IOException, 
InterruptedException {
-               // 
========================================================================================
-               //  IMPORTANT
-               //  
-               //  This method is called by the network I/O thread that reads 
the incoming TCP 
-               //  connections. This method must have minimal overhead and not 
throw exception if
-               //  something is wrong with a job or individual transmission, 
but only when something
-               //  is fundamentally broken in the system.
-               // 
========================================================================================
-               
-               // the sender hint event is to let the receiver know where 
exactly the envelope came from.
-               // the receiver will cache the sender id and its connection 
info in its local lookup table
-               // that allows the receiver to send envelopes to the sender 
without first pinging the job manager
-               // for the sender's connection info
-               
-               // Check if the envelope is the special envelope with the 
sender hint event
-               if (SenderHintEvent.isSenderHintEvent(envelope)) {
-                       // Check if this is the final destination of the sender 
hint event before adding it
-                       final SenderHintEvent seh = (SenderHintEvent) 
envelope.deserializeEvents().get(0);
-                       if (this.channels.get(seh.getSource()) != null) {
-                               addReceiverListHint(seh.getSource(), 
seh.getRemoteReceiver());
-                               return;
-                       }
-               }
-               
-               // try and get the receiver list. if we cannot get it anymore, 
the task has been cleared
-               // the code frees the envelope on exception, so we need not to 
anything
-               EnvelopeReceiverList receiverList = 
getReceiverListForEnvelope(envelope, false);
-               if (receiverList == null) {
-                       // receiver is cancelled and cleaned away
-                       releaseEnvelope(envelope);
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Dropping envelope for cleaned up 
receiver.");
-                       }
-
-                       return;
-               }
-
-               if (!receiverList.hasLocalReceiver() || 
receiverList.hasRemoteReceiver()) {
-                       throw new IOException("Bug in network stack: Envelope 
dispatched from the incoming network pipe has no local receiver or has a remote 
receiver");
-               }
-
-               ChannelID localReceiver = receiverList.getLocalReceiver();
-               Channel channel = this.channels.get(localReceiver);
-               
-               // if the channel is null, it means that receiver has been 
cleared already (cancelled or failed).
-               // release the buffer immediately
-               if (channel == null) {
-                       releaseEnvelope(envelope);
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Dropping envelope for cancelled 
receiver " + localReceiver);
-                       }
-               }
-               else {
-                       channel.queueEnvelope(envelope);
-               }
-       }
-
-       /**
-        * 
-        * Upon an exception, this method frees the envelope.
-        */
-       private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope 
envelope, boolean reportException) throws IOException {
-               try {
-                       return getReceiverList(envelope.getJobID(), 
envelope.getSource(), reportException);
-               } catch (IOException e) {
-                       releaseEnvelope(envelope);
-                       throw e;
-               } catch (CancelTaskException e) {
-                       releaseEnvelope(envelope);
-                       throw e;
-               } catch (Throwable t) {
-                       releaseEnvelope(envelope);
-                       ExceptionUtils.rethrow(t, "Error while requesting 
receiver list.");
-                       return null; // silence the compiler
-               }
-       }
-       
-       // 
-----------------------------------------------------------------------------------------------------------------
-       //                                       BufferProviderBroker methods
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       @Override
-       public BufferProvider getBufferProvider(JobID jobID, ChannelID 
sourceChannelID) throws IOException {
-               EnvelopeReceiverList receiverList = getReceiverList(jobID, 
sourceChannelID, false);
-               
-               // check if the receiver is already gone
-               if (receiverList == null) {
-                       return this.discardBufferPool;
-               }
-
-               if (!receiverList.hasLocalReceiver() || 
receiverList.hasRemoteReceiver()) {
-                       throw new IOException("The destination to be looked up 
is not a single local endpoint.");
-               }
-               
-
-               ChannelID localReceiver = receiverList.getLocalReceiver();
-               Channel channel = this.channels.get(localReceiver);
-               
-               if (channel == null) {
-                       // receiver is already canceled
-                       return this.discardBufferPool;
-               }
-
-               if (!channel.isInputChannel()) {
-                       throw new IOException("Channel context for local 
receiver " + localReceiver + " is not an input channel context");
-               }
-
-               return (InputChannel<?>) channel;
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       public void logBufferUtilization() {
-               System.out.println("Buffer utilization at " + 
System.currentTimeMillis());
-
-               System.out.println("\tUnused global buffers: " + 
this.globalBufferPool.numAvailableBuffers());
-
-               System.out.println("\tLocal buffer pool status:");
-
-               for (LocalBufferPoolOwner bufferPool : 
this.localBuffersPools.values()) {
-                       bufferPool.logBufferUtilization();
-               }
-
-               System.out.println("\tIncoming connections:");
-
-               for (Channel channel : this.channels.values()) {
-                       if (channel.isInputChannel()) {
-                               ((InputChannel<?>) 
channel).logQueuedEnvelopes();
-                       }
-               }
-       }
-       
-       public void verifyAllCachesEmpty() {
-               if (!channels.isEmpty()) {
-                       throw new IllegalStateException("Channel manager caches 
not empty: There are still registered channels.");
-               }
-               if (!localBuffersPools.isEmpty()) {
-                       throw new IllegalStateException("Channel manager caches 
not empty: There are still local buffer pools.");
-               }
-               if (!receiverCache.isEmpty()) {
-                       throw new IllegalStateException("Channel manager caches 
not empty: There are still entries in the receiver cache.");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
deleted file mode 100644
index 51a0f94..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-public class ConnectionInfoLookupResponse implements Serializable {
-
-       private static final long serialVersionUID = 3961171754642077522L;
-
-       
-       private enum ReturnCode {
-               NOT_FOUND, FOUND_AND_RECEIVER_READY, 
FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
-       };
-
-       // was request successful?
-       private ReturnCode returnCode;
-       
-       private RemoteReceiver remoteTarget;
-
-       private ChannelID localTarget;
-       
-       public ConnectionInfoLookupResponse() {}
-
-       public ConnectionInfoLookupResponse(ReturnCode code) {
-               this.returnCode = code;
-               this.remoteTarget = null;
-               this.localTarget = null;
-       }
-       
-       public ConnectionInfoLookupResponse(ReturnCode code, ChannelID 
localTarget) {
-               this.returnCode = code;
-               this.remoteTarget = null;
-               this.localTarget = localTarget;
-       }
-       
-       public ConnectionInfoLookupResponse(ReturnCode code, RemoteReceiver 
receiver) {
-               this.returnCode = code;
-               this.remoteTarget = receiver;
-               this.localTarget = null;
-       }
-
-       public RemoteReceiver getRemoteTarget() {
-               return this.remoteTarget;
-       }
-
-       public ChannelID getLocalTarget() {
-               return this.localTarget;
-       }
-
-       public boolean receiverNotFound() {
-               return (this.returnCode == ReturnCode.NOT_FOUND);
-       }
-
-       public boolean receiverNotReady() {
-               return (this.returnCode == 
ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
-       }
-
-       public boolean receiverReady() {
-               return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY);
-       }
-
-       public boolean isJobAborting() {
-               return (this.returnCode == ReturnCode.JOB_IS_ABORTING);
-       }
-
-       
-       public static ConnectionInfoLookupResponse 
createReceiverFoundAndReady(ChannelID targetChannelID) {
-               return new 
ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, 
targetChannelID);
-       }
-
-       public static ConnectionInfoLookupResponse 
createReceiverFoundAndReady(RemoteReceiver remoteReceiver) {
-               return new 
ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, 
remoteReceiver);
-       }
-
-       public static ConnectionInfoLookupResponse createReceiverNotFound() {
-               return new ConnectionInfoLookupResponse(ReturnCode.NOT_FOUND);
-       }
-
-       public static ConnectionInfoLookupResponse createReceiverNotReady() {
-               return new 
ConnectionInfoLookupResponse(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
-       }
-
-       public static ConnectionInfoLookupResponse createJobIsAborting() {
-               return new 
ConnectionInfoLookupResponse(ReturnCode.JOB_IS_ABORTING);
-       }
-
-       
-       @Override
-       public String toString() {
-               return this.returnCode.name() + ", local target: " + 
this.localTarget + ", remoteTarget: " + this.remoteTarget;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
new file mode 100644
index 0000000..4a5536b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import 
org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+
+import java.io.IOException;
+
+/**
+ * The connection manager manages physical connections for the (logical) remote
+ * input channels at runtime.
+ */
+public interface ConnectionManager {
+
+       void start(IntermediateResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher) throws IOException;
+
+       /**
+        * Creates a {@link PartitionRequestClient} instance for the given 
{@link RemoteAddress}.
+        */
+       PartitionRequestClient createPartitionRequestClient(RemoteAddress 
remoteAddress) throws IOException;
+
+       int getNumberOfActiveConnections();
+
+       void shutdown() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
deleted file mode 100644
index 53f403c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer;
-import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class Envelope {
-
-       private final JobID jobID;
-
-       private final ChannelID source;
-
-       private final int sequenceNumber;
-
-       private ByteBuffer serializedEventList;
-
-       private Buffer buffer;
-
-       public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
-               this.sequenceNumber = sequenceNumber;
-               this.jobID = jobID;
-               this.source = source;
-       }
-
-       private Envelope(Envelope toDuplicate) {
-               this.jobID = toDuplicate.jobID;
-               this.source = toDuplicate.source;
-               this.sequenceNumber = toDuplicate.sequenceNumber;
-               this.serializedEventList = null;
-               this.buffer = null;
-       }
-
-       public Envelope duplicate() {
-               Envelope duplicate = new Envelope(this);
-               if (hasBuffer()) {
-                       duplicate.setBuffer(this.buffer.duplicate());
-               }
-
-               return duplicate;
-       }
-
-       public Envelope duplicateWithoutBuffer() {
-               return new Envelope(this);
-       }
-
-       public JobID getJobID() {
-               return this.jobID;
-       }
-
-       public ChannelID getSource() {
-               return this.source;
-       }
-
-       public int getSequenceNumber() {
-               return this.sequenceNumber;
-       }
-
-       public void setEventsSerialized(ByteBuffer serializedEventList) {
-               if (this.serializedEventList != null) {
-                       throw new IllegalStateException("Event list has already 
been set.");
-               }
-
-               this.serializedEventList = serializedEventList;
-       }
-
-       public void serializeEventList(List<? extends AbstractEvent> eventList) 
{
-               if (this.serializedEventList != null) {
-                       throw new IllegalStateException("Event list has already 
been set.");
-               }
-
-               this.serializedEventList = serializeEvents(eventList);
-       }
-
-       public ByteBuffer getEventsSerialized() {
-               return this.serializedEventList;
-       }
-
-       public List<? extends AbstractEvent> deserializeEvents() {
-               return deserializeEvents(getClass().getClassLoader());
-       }
-
-       public List<? extends AbstractEvent> deserializeEvents(ClassLoader 
classloader) {
-               if (this.serializedEventList == null) {
-                       return Collections.emptyList();
-               }
-
-               try {
-                       DataInputDeserializer deserializer = new 
DataInputDeserializer(this.serializedEventList);
-
-                       int numEvents = deserializer.readInt();
-                       ArrayList<AbstractEvent> events = new 
ArrayList<AbstractEvent>(numEvents);
-
-                       for (int i = 0; i < numEvents; i++) {
-                               String className = deserializer.readUTF();
-                               Class<? extends AbstractEvent> clazz;
-                               try {
-                                       clazz = 
Class.forName(className).asSubclass(AbstractEvent.class);
-                               } catch (ClassNotFoundException e) {
-                                       throw new RuntimeException("Could not 
load event class '" + className + "'.", e);
-                               } catch (ClassCastException e) {
-                                       throw new RuntimeException("The class 
'" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() 
+ "'.", e);
-                               }
-
-                               AbstractEvent evt = 
InstantiationUtil.instantiate(clazz, AbstractEvent.class);
-                               evt.read(deserializer);
-
-                               events.add(evt);
-                       }
-
-                       return events;
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Error while deserializing 
the events.", e);
-               }
-       }
-
-       public void setBuffer(Buffer buffer) {
-               this.buffer = buffer;
-       }
-
-       public Buffer getBuffer() {
-               return this.buffer;
-       }
-
-       private ByteBuffer serializeEvents(List<? extends AbstractEvent> 
events) {
-               try {
-                       // create the serialized event list
-                       DataOutputSerializer serializer = events.size() == 0
-                               ? new DataOutputSerializer(4)
-                               : new DataOutputSerializer(events.size() * 32);
-                       serializer.writeInt(events.size());
-
-                       for (AbstractEvent evt : events) {
-                               serializer.writeUTF(evt.getClass().getName());
-                               evt.write(serializer);
-                       }
-
-                       return serializer.wrapAsByteBuffer();
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Error while serializing the 
task events.", e);
-               }
-       }
-
-       public boolean hasBuffer() {
-               return this.buffer != null;
-       }
-
-       @Override
-       public String toString() {
-               return String.format("Envelope %d [source id: %s, buffer size: 
%d, events size: %d]",
-                               this.sequenceNumber, this.getSource(), 
this.buffer == null ? -1 : this.buffer.size(),
-                               this.serializedEventList == null ? -1 : 
this.serializedEventList.remaining());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
deleted file mode 100644
index 59cf491..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-
-/**
- * A envelope dispatcher receives {@link Envelope}s and sends them to all of 
its destinations.
- */
-public interface EnvelopeDispatcher {
-
-       /**
-        * Dispatches an envelope from an output channel to the receiving input 
channels (forward flow).
-        *
-        * @param envelope envelope to be sent
-        */
-       void dispatchFromOutputChannel(Envelope envelope) throws IOException, 
InterruptedException;
-
-       /**
-        * Dispatches an envelope from an input channel to the receiving output 
channels (backwards flow).
-        *
-        * @param envelope envelope to be sent
-        */
-       void dispatchFromInputChannel(Envelope envelope) throws IOException, 
InterruptedException;
-
-       /**
-        * Dispatches an envelope from an incoming TCP connection.
-        * <p>
-        * After an envelope has been constructed from a TCP socket, this 
method is called to send the envelope to the
-        * receiving input channel.
-        *
-        * @param envelope envelope to be sent
-        */
-       void dispatchFromNetwork(Envelope envelope) throws IOException, 
InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
deleted file mode 100644
index f6e4982..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-/**
- * A transfer envelope receiver list contains all recipients of a transfer 
envelope. Their are three different types of
- * receivers: Local receivers identified by {@link ChannelID} objects, remote 
receivers identified by
- * {@link InetAddress} objects and finally checkpoints which are identified by
- * <p>
- * This class is thread-safe.
- * 
- */
-public class EnvelopeReceiverList {
-
-       private final ChannelID localReceiver;
-
-       private final RemoteReceiver remoteReceiver;
-
-       public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
-               this.localReceiver = cilr.getLocalTarget();
-               this.remoteReceiver = cilr.getRemoteTarget();
-       }
-
-       public EnvelopeReceiverList(ChannelID localReceiver) {
-               this.localReceiver = localReceiver;
-               this.remoteReceiver = null;
-       }
-
-       public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
-               this.localReceiver = null;
-               this.remoteReceiver = remoteReceiver;
-       }
-
-       public boolean hasLocalReceiver() {
-               return this.localReceiver != null;
-       }
-
-       public boolean hasRemoteReceiver() {
-               return this.remoteReceiver != null;
-       }
-
-       public int getTotalNumberOfReceivers() {
-               return (this.localReceiver == null ? 0 : 1) + 
(this.remoteReceiver == null ? 0 : 1);
-       }
-
-       public RemoteReceiver getRemoteReceiver() {
-               return this.remoteReceiver;
-       }
-
-       public ChannelID getLocalReceiver() {
-               return this.localReceiver;
-       }
-       
-       @Override
-       public String toString() {
-               return "local receiver: " + this.localReceiver + ", remote 
receiver: " + this.remoteReceiver;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
deleted file mode 100644
index df4f4ec..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-/**
- * This exception is thrown by the {@link ChannelManager} to indicate that a 
task cannot be accepted because
- * there are not enough resources available to safely execute it.
- * 
- */
-public final class InsufficientResourcesException extends Exception {
-
-       /**
-        * The generated serial version UID.
-        */
-       private static final long serialVersionUID = -8977049569413215169L;
-
-       /**
-        * Constructs a new insufficient resources exception.
-        * 
-        * @param msg
-        *        the message describing the exception
-        */
-       InsufficientResourcesException(final String msg) {
-               super(msg);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 264fde6..894db35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,21 +18,24 @@
 
 package org.apache.flink.runtime.io.network;
 
-import java.io.IOException;
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import 
org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
 
-public class LocalConnectionManager implements NetworkConnectionManager {
+import java.io.IOException;
 
-       @Override
-       public void start(ChannelManager channelManager) throws IOException {
-       }
+/**
+ * A connection manager implementation to bypass setup overhead for task 
managers running in local
+ * execution mode.
+ */
+public class LocalConnectionManager implements ConnectionManager {
 
        @Override
-       public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean 
isFirstEnvelope) throws IOException {
+       public void start(IntermediateResultPartitionProvider 
partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
        }
 
        @Override
-       public void close(RemoteReceiver receiver) {
-
+       public PartitionRequestClient 
createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+               return null;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
deleted file mode 100644
index 84d2d80..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-
-/**
- *
- */
-public class LocalReceiverCancelledException extends CancelTaskException {
-       private static final long serialVersionUID = 1L;
-
-       private final ChannelID receiver;
-
-       public LocalReceiverCancelledException(ChannelID receiver) {
-               this.receiver = receiver;
-       }
-       
-       
-       public ChannelID getReceiver() {
-               return receiver;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
deleted file mode 100644
index 309c92d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-
-public interface NetworkConnectionManager {
-
-       public void start(ChannelManager channelManager) throws IOException;
-
-       public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean 
isFirstEnvelope) throws IOException;
-
-       public void close(RemoteReceiver receiver);
-
-       public int getNumberOfActiveConnections();
-
-       public void shutdown() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
new file mode 100644
index 0000000..aa6c64c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.api.reader.BufferReader;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import 
org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Network I/O components of each {@link TaskManager} instance.
+ */
+public class NetworkEnvironment {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
+
+       private final ActorRef jobManager;
+
+       private final FiniteDuration jobManagerTimeout;
+
+       private final IntermediateResultPartitionManager partitionManager;
+
+       private final TaskEventDispatcher taskEventDispatcher;
+
+       private final NetworkBufferPool networkBufferPool;
+
+       private final ConnectionManager connectionManager;
+
+       private boolean isShutdown;
+
+       /**
+        * Initializes all network I/O components.
+        */
+       public NetworkEnvironment(ActorRef jobManager, FiniteDuration 
jobManagerTimeout, NetworkEnvironmentConfiguration config) throws IOException {
+               this.jobManager = checkNotNull(jobManager);
+               this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
+
+               this.partitionManager = new 
IntermediateResultPartitionManager();
+               this.taskEventDispatcher = new TaskEventDispatcher();
+
+               // 
--------------------------------------------------------------------
+               // Network buffers
+               // 
--------------------------------------------------------------------
+               try {
+                       networkBufferPool = new 
NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+               }
+               catch (Throwable t) {
+                       throw new IOException("Failed to instantiate network 
buffer pool: " + t.getMessage(), t);
+               }
+
+               // 
--------------------------------------------------------------------
+               // Network connections
+               // 
--------------------------------------------------------------------
+               final Option<NettyConfig> nettyConfig = config.nettyConfig();
+
+               connectionManager = nettyConfig.isDefined() ? new 
NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
+
+               try {
+                       connectionManager.start(partitionManager, 
taskEventDispatcher);
+               }
+               catch (Throwable t) {
+                       throw new IOException("Failed to instantiate network 
connection manager: " + t.getMessage(), t);
+               }
+       }
+
+       public ActorRef getJobManager() {
+               return jobManager;
+       }
+
+       public FiniteDuration getJobManagerTimeout() {
+               return jobManagerTimeout;
+       }
+
+       public void registerTask(Task task) throws IOException {
+               final ExecutionAttemptID executionId = task.getExecutionId();
+
+               final IntermediateResultPartition[] producedPartitions = 
task.getProducedPartitions();
+               final BufferWriter[] writers = task.getWriters();
+
+               if (writers.length != producedPartitions.length) {
+                       throw new IllegalStateException("Unequal number of 
writers and partitions.");
+               }
+
+               for (int i = 0; i < producedPartitions.length; i++) {
+                       final IntermediateResultPartition partition = 
producedPartitions[i];
+                       final BufferWriter writer = writers[i];
+
+                       // Buffer pool for the partition
+                       BufferPool bufferPool = null;
+
+                       try {
+                               bufferPool = 
networkBufferPool.createBufferPool(partition.getNumberOfQueues(), false);
+                               partition.setBufferPool(bufferPool);
+                               
partitionManager.registerIntermediateResultPartition(partition);
+                       }
+                       catch (Throwable t) {
+                               if (bufferPool != null) {
+                                       bufferPool.destroy();
+                               }
+
+                               if (t instanceof IOException) {
+                                       throw (IOException) t;
+                               }
+                               else {
+                                       throw new IOException(t.getMessage(), 
t);
+                               }
+                       }
+
+                       // Register writer with task event dispatcher
+                       
taskEventDispatcher.registerWriterForIncomingTaskEvents(executionId, 
writer.getPartitionId(), writer);
+               }
+
+               // Setup the buffer pool for each buffer reader
+               final BufferReader[] readers = task.getReaders();
+
+               for (BufferReader reader : readers) {
+                       BufferPool bufferPool = null;
+
+                       try {
+                               bufferPool = 
networkBufferPool.createBufferPool(reader.getNumberOfInputChannels(), false);
+                               reader.setBufferPool(bufferPool);
+                       }
+                       catch (Throwable t) {
+                               if (bufferPool != null) {
+                                       bufferPool.destroy();
+                               }
+
+                               if (t instanceof IOException) {
+                                       throw (IOException) t;
+                               }
+                               else {
+                                       throw new IOException(t.getMessage(), 
t);
+                               }
+                       }
+               }
+       }
+
+       public void unregisterTask(Task task) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Unregistering task {} ({}) from network 
environment (state: {}).", task.getTaskNameWithSubtasks(), 
task.getExecutionId(), task.getExecutionState());
+               }
+
+               final ExecutionAttemptID executionId = task.getExecutionId();
+
+               if (task.isCanceledOrFailed()) {
+                       
partitionManager.failIntermediateResultPartitions(executionId);
+               }
+
+               taskEventDispatcher.unregisterWriters(executionId);
+
+               final BufferReader[] readers = task.getReaders();
+
+               if (readers != null) {
+                       for (BufferReader reader : readers) {
+                               try {
+                                       if (reader != null) {
+                                               reader.releaseAllResources();
+                                       }
+                               }
+                               catch (IOException e) {
+                                       LOG.error("Error during release of 
reader resources: " + e.getMessage(), e);
+                               }
+                       }
+               }
+       }
+
+       public IntermediateResultPartitionManager getPartitionManager() {
+               return partitionManager;
+       }
+
+       public TaskEventDispatcher getTaskEventDispatcher() {
+               return taskEventDispatcher;
+       }
+
+       public ConnectionManager getConnectionManager() {
+               return connectionManager;
+       }
+
+       public NetworkBufferPool getNetworkBufferPool() {
+               return networkBufferPool;
+       }
+
+       public boolean hasReleasedAllResources() {
+               String msg = String.format("Network buffer pool: %d missing 
memory segments. %d registered buffer pools. Connection manager: %d active 
connections. Task event dispatcher: %d registered writers.",
+                               
networkBufferPool.getTotalNumberOfMemorySegments() - 
networkBufferPool.getNumberOfAvailableMemorySegments(), 
networkBufferPool.getNumberOfRegisteredBufferPools(), 
connectionManager.getNumberOfActiveConnections(), 
taskEventDispatcher.getNumberOfRegisteredWriters());
+
+               boolean success = 
networkBufferPool.getTotalNumberOfMemorySegments() == 
networkBufferPool.getNumberOfAvailableMemorySegments() &&
+                               
networkBufferPool.getNumberOfRegisteredBufferPools() == 0 &&
+                               
connectionManager.getNumberOfActiveConnections() == 0 &&
+                               
taskEventDispatcher.getNumberOfRegisteredWriters() == 0;
+
+               if (success) {
+                       String successMsg = "Network environment did release 
all resources: " + msg;
+                       LOG.debug(successMsg);
+               }
+               else {
+                       String errMsg = "Network environment did *not* release 
all resources: " + msg;
+
+                       LOG.error(errMsg);
+               }
+
+               return success;
+       }
+
+       /**
+        * Tries to shut down all network I/O components.
+        */
+       public void shutdown() {
+               if (!isShutdown) {
+                       try {
+                               if (networkBufferPool != null) {
+                                       networkBufferPool.destroy();
+                               }
+                       }
+                       catch (Throwable t) {
+                               LOG.warn("Network buffer pool did not shut down 
properly: " + t.getMessage(), t);
+                       }
+
+                       if (partitionManager != null) {
+                               try {
+                                       partitionManager.shutdown();
+                               }
+                               catch (Throwable t) {
+                                       LOG.warn("Partition manager did not 
shut down properly: " + t.getMessage(), t);
+                               }
+                       }
+
+                       try {
+                               if (connectionManager != null) {
+                                       connectionManager.shutdown();
+                               }
+                       }
+                       catch (Throwable t) {
+                               LOG.warn("Network connection manager did not 
shut down properly: " + t.getMessage(), t);
+                       }
+
+                       isShutdown = true;
+               }
+       }
+
+       public boolean isShutdown() {
+               return isShutdown;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
new file mode 100644
index 0000000..937055b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link RemoteAddress} identifies a connection to a remote task manager by
+ * the socket address and a connection index. This allows multiple connections
+ * to be distinguished by their connection index.
+ * <p>
+ * The connection index is assigned by the {@link IntermediateResult} and
+ * ensures that it is safe to multiplex multiple data transfers over the same
+ * physical TCP connection.
+ */
+public class RemoteAddress implements IOReadableWritable, Serializable {
+
+       private InetSocketAddress address;
+
+       private int connectionIndex;
+
+       public RemoteAddress(InstanceConnectionInfo connectionInfo, int 
connectionIndex) {
+               this(new InetSocketAddress(connectionInfo.address(), 
connectionInfo.dataPort()), connectionIndex);
+       }
+
+       public RemoteAddress(InetSocketAddress address, int connectionIndex) {
+               this.address = checkNotNull(address);
+               checkArgument(connectionIndex >= 0);
+               this.connectionIndex = connectionIndex;
+       }
+
+       public InetSocketAddress getAddress() {
+               return address;
+       }
+
+       public int getConnectionIndex() {
+               return connectionIndex;
+       }
+
+       @Override
+       public int hashCode() {
+               return address.hashCode() + (31 * connectionIndex);
+       }
+
+       @Override
+       public boolean equals(Object other) {
+               if (other.getClass() != RemoteAddress.class) {
+                       return false;
+               }
+
+               final RemoteAddress ra = (RemoteAddress) other;
+               if (!ra.getAddress().equals(address) || ra.getConnectionIndex() 
!= connectionIndex) {
+                       return false;
+               }
+
+               return true;
+       }
+
+       @Override
+       public String toString() {
+               return address + " [" + connectionIndex + "]";
+       }
+
+       // 
------------------------------------------------------------------------
+       // Serialization
+       // 
------------------------------------------------------------------------
+
+       public RemoteAddress() {
+               this.address = null;
+               this.connectionIndex = -1;
+       }
+
+       @Override
+       public void write(final DataOutputView out) throws IOException {
+               final InetAddress ia = address.getAddress();
+               out.writeInt(ia.getAddress().length);
+               out.write(ia.getAddress());
+               out.writeInt(address.getPort());
+
+               out.writeInt(connectionIndex);
+       }
+
+       @Override
+       public void read(final DataInputView in) throws IOException {
+               final byte[] addressBytes = new byte[in.readInt()];
+               in.readFully(addressBytes);
+
+               final InetAddress ia = InetAddress.getByAddress(addressBytes);
+               int port = in.readInt();
+
+               address = new InetSocketAddress(ia, port);
+               connectionIndex = in.readInt();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
deleted file mode 100644
index 436d07d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Objects of this class uniquely identify a connection to a remote {@link 
org.apache.flink.runtime.taskmanager.TaskManager}.
- */
-public final class RemoteReceiver implements IOReadableWritable, Serializable {
-
-       private static final long serialVersionUID = 4304924747853162443L;
-
-       /**
-        * The address of the connection to the remote TaskManager.
-        */
-       private InetSocketAddress connectionAddress;
-
-       /**
-        * The index of the connection to the remote TaskManager.
-        */
-       private int connectionIndex;
-
-       /**
-        * Constructs a new remote receiver object.
-        * 
-        * @param connectionAddress
-        *        the address of the connection to the remote TaskManager
-        * @param connectionIndex
-        *        the index of the connection to the remote TaskManager
-        */
-       public RemoteReceiver(final InetSocketAddress connectionAddress, final 
int connectionIndex) {
-               if (connectionAddress == null) {
-                       throw new IllegalArgumentException("Argument 
connectionAddress must not be null");
-               }
-               if (connectionIndex < 0) {
-                       throw new IllegalArgumentException("Argument 
connectionIndex must be a non-negative integer number");
-               }
-
-               this.connectionAddress = connectionAddress;
-               this.connectionIndex = connectionIndex;
-       }
-
-       /**
-        * Default constructor for serialization/deserialization.
-        */
-       public RemoteReceiver() {
-               this.connectionAddress = null;
-               this.connectionIndex = -1;
-       }
-
-       /**
-        * Returns the address of the connection to the remote TaskManager.
-        * 
-        * @return the address of the connection to the remote TaskManager
-        */
-       public InetSocketAddress getConnectionAddress() {
-               return this.connectionAddress;
-       }
-
-       /**
-        * Returns the index of the connection to the remote TaskManager.
-        * 
-        * @return the index of the connection to the remote TaskManager
-        */
-       public int getConnectionIndex() {
-               return this.connectionIndex;
-       }
-
-
-       @Override
-       public int hashCode() {
-               return this.connectionAddress.hashCode() + (31 * 
this.connectionIndex);
-       }
-
-
-       @Override
-       public boolean equals(final Object obj) {
-
-               if (!(obj instanceof RemoteReceiver)) {
-                       return false;
-               }
-
-               final RemoteReceiver rr = (RemoteReceiver) obj;
-               if (!this.connectionAddress.equals(rr.connectionAddress)) {
-                       return false;
-               }
-
-               if (this.connectionIndex != rr.connectionIndex) {
-                       return false;
-               }
-
-               return true;
-       }
-
-
-       @Override
-       public void write(final DataOutputView out) throws IOException {
-
-               final InetAddress ia = this.connectionAddress.getAddress();
-               out.writeInt(ia.getAddress().length);
-               out.write(ia.getAddress());
-               out.writeInt(this.connectionAddress.getPort());
-
-               out.writeInt(this.connectionIndex);
-       }
-
-
-       @Override
-       public void read(final DataInputView in) throws IOException {
-               final int addr_length = in.readInt();
-               final byte[] address = new byte[addr_length];
-               in.readFully(address);
-
-               InetAddress ia = InetAddress.getByAddress(address);
-               int port = in.readInt();
-               this.connectionAddress = new InetSocketAddress(ia, port);
-
-               this.connectionIndex = in.readInt();
-       }
-
-
-       @Override
-       public String toString() {
-               return this.connectionAddress + " (" + this.connectionIndex + 
")";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
deleted file mode 100644
index 7cdbabc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-public final class SenderHintEvent extends AbstractEvent {
-
-       /**
-        * The sequence number that will be set for transfer envelopes which 
contain the sender hint event.
-        */
-       private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;
-
-       private final ChannelID source;
-
-       private final RemoteReceiver remoteReceiver;
-
-       SenderHintEvent(final ChannelID source, final RemoteReceiver 
remoteReceiver) {
-
-               if (source == null) {
-                       throw new IllegalArgumentException("Argument source 
must not be null");
-               }
-
-               if (remoteReceiver == null) {
-                       throw new IllegalArgumentException("Argument 
remoteReceiver must not be null");
-               }
-
-               this.source = source;
-               this.remoteReceiver = remoteReceiver;
-       }
-
-       public SenderHintEvent() {
-
-               this.source = new ChannelID();
-               this.remoteReceiver = new RemoteReceiver();
-       }
-
-       public ChannelID getSource() {
-
-               return this.source;
-       }
-
-       public RemoteReceiver getRemoteReceiver() {
-
-               return this.remoteReceiver;
-       }
-
-
-       @Override
-       public void write(final DataOutputView out) throws IOException {
-
-               this.source.write(out);
-               this.remoteReceiver.write(out);
-       }
-
-
-       @Override
-       public void read(final DataInputView in) throws IOException {
-
-               this.source.read(in);
-               this.remoteReceiver.read(in);
-       }
-
-       public static Envelope createEnvelopeWithEvent(final Envelope 
originalEnvelope,
-                       final ChannelID source, final RemoteReceiver 
remoteReceiver) {
-
-               final Envelope envelope = new 
Envelope(SENDER_HINT_SEQUENCE_NUMBER,
-                       originalEnvelope.getJobID(), 
originalEnvelope.getSource());
-
-               final SenderHintEvent senderEvent = new SenderHintEvent(source, 
remoteReceiver);
-               envelope.serializeEventList(Arrays.asList(senderEvent));
-
-               return envelope;
-       }
-
-       static boolean isSenderHintEvent(final Envelope envelope) {
-
-               if (envelope.getSequenceNumber() != 
SENDER_HINT_SEQUENCE_NUMBER) {
-                       return false;
-               }
-
-               if (envelope.getBuffer() != null) {
-                       return false;
-               }
-
-               List<? extends AbstractEvent> events = 
envelope.deserializeEvents();
-
-               if (events.size() != 1) {
-                       return false;
-               }
-
-               if (!(events.get(0) instanceof SenderHintEvent)) {
-                       return false;
-               }
-
-               return true;
-       }
-}

Reply via email to