kafka-1928; Move kafka.network over to using the network classes in 
org.apache.kafka.common.network; patched by Gwen Shapira; reviewed by Joel 
Koshy, Jay Kreps, Jiangjie Qin, Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78ba492e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78ba492e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78ba492e

Branch: refs/heads/trunk
Commit: 78ba492e3e70fd9db61bc82469371d04a8d6b762
Parents: d22987f
Author: Gwen Shapira <csh...@gmail.com>
Authored: Wed Jun 3 21:40:35 2015 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Jun 3 21:40:35 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  |  78 ++---
 .../kafka/clients/CommonClientConfigs.java      |   2 +
 .../apache/kafka/clients/InFlightRequests.java  |  18 +-
 .../org/apache/kafka/clients/KafkaClient.java   |   8 +-
 .../org/apache/kafka/clients/NetworkClient.java |  59 ++--
 .../kafka/clients/consumer/ConsumerConfig.java  |  11 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   3 +-
 .../clients/consumer/internals/Coordinator.java |   4 +-
 .../clients/consumer/internals/Fetcher.java     |  10 +-
 .../kafka/clients/producer/KafkaProducer.java   |   3 +-
 .../kafka/clients/producer/ProducerConfig.java  |   6 +-
 .../clients/producer/internals/Sender.java      |   6 +-
 .../main/java/org/apache/kafka/common/Node.java |  10 +
 .../kafka/common/network/ByteBufferReceive.java |  10 +-
 .../kafka/common/network/ByteBufferSend.java    |  20 +-
 .../common/network/InvalidReceiveException.java |  30 ++
 .../apache/kafka/common/network/MultiSend.java  | 100 ++++++
 .../kafka/common/network/NetworkReceive.java    |  59 +++-
 .../kafka/common/network/NetworkSend.java       |   2 +-
 .../apache/kafka/common/network/Receive.java    |   8 +-
 .../apache/kafka/common/network/Selectable.java |  16 +-
 .../apache/kafka/common/network/Selector.java   | 230 ++++++++-----
 .../org/apache/kafka/common/network/Send.java   |  18 +-
 .../kafka/common/requests/RequestSend.java      |   2 +-
 .../kafka/common/requests/ResponseSend.java     |  41 +++
 .../org/apache/kafka/clients/MockClient.java    |   6 +-
 .../apache/kafka/clients/NetworkClientTest.java |   8 +-
 .../kafka/common/network/SelectorTest.java      |  86 ++---
 .../org/apache/kafka/test/MockSelector.java     |  25 +-
 core/src/main/scala/kafka/Kafka.scala           |  12 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |   2 +-
 .../kafka/api/ConsumerMetadataRequest.scala     |   7 +-
 .../kafka/api/ControlledShutdownRequest.scala   |   9 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   2 +-
 .../main/scala/kafka/api/FetchResponse.scala    |  73 ++--
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  12 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |  10 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |  15 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |   7 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   7 +-
 core/src/main/scala/kafka/api/RequestKeys.scala |   4 +-
 .../scala/kafka/api/StopReplicaRequest.scala    |   4 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  |   8 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |   4 +-
 .../main/scala/kafka/client/ClientUtils.scala   |   2 +-
 .../scala/kafka/consumer/SimpleConsumer.scala   |  19 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   4 +-
 .../controller/ControllerChannelManager.scala   |  11 +-
 .../kafka/javaapi/TopicMetadataRequest.scala    |   7 +-
 .../scala/kafka/network/BlockingChannel.scala   |  21 +-
 .../network/BoundedByteBufferReceive.scala      |  90 -----
 .../kafka/network/BoundedByteBufferSend.scala   |  71 ----
 .../scala/kafka/network/ByteBufferSend.scala    |  40 ---
 core/src/main/scala/kafka/network/Handler.scala |   6 +-
 .../scala/kafka/network/RequestChannel.scala    |  35 +-
 .../kafka/network/RequestOrResponseSend.scala   |  57 ++++
 .../main/scala/kafka/network/SocketServer.scala | 334 ++++++++-----------
 .../main/scala/kafka/network/Transmission.scala | 122 -------
 .../scala/kafka/producer/SyncProducer.scala     |  19 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  44 +--
 .../main/scala/kafka/server/KafkaConfig.scala   |  56 +++-
 .../main/scala/kafka/server/KafkaServer.scala   |  33 +-
 .../scala/kafka/server/MessageSetSend.scala     |  71 ----
 .../kafka/tools/ConsumerOffsetChecker.scala     |   2 +-
 .../scala/other/kafka/TestOffsetManager.scala   |   6 +-
 .../test/scala/unit/kafka/KafkaConfigTest.scala |  17 +-
 .../unit/kafka/network/SocketServerTest.scala   |  41 +--
 .../kafka/server/KafkaConfigConfigDefTest.scala |   8 +
 68 files changed, 1075 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index da76cc2..9ebda5e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -21,22 +21,22 @@ import java.util.Map;
  */
 final class ClusterConnectionStates {
     private final long reconnectBackoffMs;
-    private final Map<Integer, NodeConnectionState> nodeState;
+    private final Map<String, NodeConnectionState> nodeState;
 
     public ClusterConnectionStates(long reconnectBackoffMs) {
         this.reconnectBackoffMs = reconnectBackoffMs;
-        this.nodeState = new HashMap<Integer, NodeConnectionState>();
+        this.nodeState = new HashMap<String, NodeConnectionState>();
     }
 
     /**
-     * Return true iff we can currently initiate a new connection to the given 
node. This will be the case if we are not
+     * Return true iff we can currently initiate a new connection. This will 
be the case if we are not
      * connected and haven't been connected for at least the minimum 
reconnection backoff period.
-     * @param node The node id to check
+     * @param id The connection id to check
      * @param now The current time in MS
      * @return true if we can initiate a new connection
      */
-    public boolean canConnect(int node, long now) {
-        NodeConnectionState state = nodeState.get(node);
+    public boolean canConnect(String id, long now) {
+        NodeConnectionState state = nodeState.get(id);
         if (state == null)
             return true;
         else
@@ -45,11 +45,11 @@ final class ClusterConnectionStates {
 
     /**
      * Return true if we are disconnected from the given node and can't 
re-establish a connection yet
-     * @param node The node to check
+     * @param id The connection to check
      * @param now The current time in ms
      */
-    public boolean isBlackedOut(int node, long now) {
-        NodeConnectionState state = nodeState.get(node);
+    public boolean isBlackedOut(String id, long now) {
+        NodeConnectionState state = nodeState.get(id);
         if (state == null)
             return false;
         else
@@ -60,11 +60,11 @@ final class ClusterConnectionStates {
      * Returns the number of milliseconds to wait, based on the connection 
state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting 
or connected, this handles slow/stalled
      * connections.
-     * @param node The node to check
+     * @param id The connection to check
      * @param now The current time in ms
      */
-    public long connectionDelay(int node, long now) {
-        NodeConnectionState state = nodeState.get(node);
+    public long connectionDelay(String id, long now) {
+        NodeConnectionState state = nodeState.get(id);
         if (state == null) return 0;
         long timeWaited = now - state.lastConnectAttemptMs;
         if (state.state == ConnectionState.DISCONNECTED) {
@@ -77,67 +77,67 @@ final class ClusterConnectionStates {
     }
 
     /**
-     * Enter the connecting state for the given node.
-     * @param node The id of the node we are connecting to
+     * Enter the connecting state for the given connection.
+     * @param id The id of the connection
      * @param now The current time.
      */
-    public void connecting(int node, long now) {
-        nodeState.put(node, new 
NodeConnectionState(ConnectionState.CONNECTING, now));
+    public void connecting(String id, long now) {
+        nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, 
now));
     }
 
     /**
-     * Return true iff we have a connection to the give node
-     * @param node The id of the node to check
+     * Return true iff a specific connection is connected
+     * @param id The id of the connection to check
      */
-    public boolean isConnected(int node) {
-        NodeConnectionState state = nodeState.get(node);
+    public boolean isConnected(String id) {
+        NodeConnectionState state = nodeState.get(id);
         return state != null && state.state == ConnectionState.CONNECTED;
     }
 
     /**
-     * Return true iff we are in the process of connecting to the given node
-     * @param node The id of the node
+     * Return true iff we are in the process of connecting
+     * @param id The id of the connection
      */
-    public boolean isConnecting(int node) {
-        NodeConnectionState state = nodeState.get(node);
+    public boolean isConnecting(String id) {
+        NodeConnectionState state = nodeState.get(id);
         return state != null && state.state == ConnectionState.CONNECTING;
     }
 
     /**
-     * Enter the connected state for the given node
-     * @param node The node we have connected to
+     * Enter the connected state for the given connection
+     * @param id The connection identifier
      */
-    public void connected(int node) {
-        NodeConnectionState nodeState = nodeState(node);
+    public void connected(String id) {
+        NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.CONNECTED;
     }
 
     /**
      * Enter the disconnected state for the given node
-     * @param node The node we have disconnected from
+     * @param id The connection we have disconnected
      */
-    public void disconnected(int node) {
-        NodeConnectionState nodeState = nodeState(node);
+    public void disconnected(String id) {
+        NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.DISCONNECTED;
     }
     
     /**
-     * Get the state of our connection to the given node
-     * @param node The id of the node
+     * Get the state of a given connection
+     * @param id The id of the connection
      * @return The state of our connection
      */
-    public ConnectionState connectionState(int node) {
-        return nodeState(node).state;
+    public ConnectionState connectionState(String id) {
+        return nodeState(id).state;
     }
     
     /**
      * Get the state of a given node
-     * @param node The node to fetch the state for
+     * @param id The connection to fetch the state for
      */
-    private NodeConnectionState nodeState(int node) {
-        NodeConnectionState state = this.nodeState.get(node);
+    private NodeConnectionState nodeState(String id) {
+        NodeConnectionState state = this.nodeState.get(id);
         if (state == null)
-            throw new IllegalStateException("No entry found for node " + node);
+            throw new IllegalStateException("No entry found for connection " + 
id);
         return state;
     }
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index cf32e4e..2c421f4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -55,4 +55,6 @@ public class CommonClientConfigs {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = 
"metric.reporters";
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of 
classes to use as metrics reporters. Implementing the 
<code>MetricReporter</code> interface allows plugging in classes that will be 
notified of new metric creation. The JmxReporter is always included to register 
JMX statistics.";
 
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = 
"connections.max.idle.ms";
+    public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle 
connections after the number of milliseconds specified by this config.";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index 936487b..15d00d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -24,14 +24,14 @@ import java.util.Map;
 final class InFlightRequests {
 
     private final int maxInFlightRequestsPerConnection;
-    private final Map<Integer, Deque<ClientRequest>> requests = new 
HashMap<Integer, Deque<ClientRequest>>();
+    private final Map<String, Deque<ClientRequest>> requests = new 
HashMap<String, Deque<ClientRequest>>();
 
     public InFlightRequests(int maxInFlightRequestsPerConnection) {
         this.maxInFlightRequestsPerConnection = 
maxInFlightRequestsPerConnection;
     }
 
     /**
-     * Add the given request to the queue for the node it was directed to
+     * Add the given request to the queue for the connection it was directed to
      */
     public void add(ClientRequest request) {
         Deque<ClientRequest> reqs = 
this.requests.get(request.request().destination());
@@ -45,7 +45,7 @@ final class InFlightRequests {
     /**
      * Get the request queue for the given node
      */
-    private Deque<ClientRequest> requestQueue(int node) {
+    private Deque<ClientRequest> requestQueue(String node) {
         Deque<ClientRequest> reqs = requests.get(node);
         if (reqs == null || reqs.isEmpty())
             throw new IllegalStateException("Response from server for which 
there are no in-flight requests.");
@@ -55,7 +55,7 @@ final class InFlightRequests {
     /**
      * Get the oldest request (the one that that will be completed next) for 
the given node
      */
-    public ClientRequest completeNext(int node) {
+    public ClientRequest completeNext(String node) {
         return requestQueue(node).pollLast();
     }
 
@@ -63,7 +63,7 @@ final class InFlightRequests {
      * Get the last request we sent to the given node (but don't remove it 
from the queue)
      * @param node The node id
      */
-    public ClientRequest lastSent(int node) {
+    public ClientRequest lastSent(String node) {
         return requestQueue(node).peekFirst();
     }
 
@@ -72,7 +72,7 @@ final class InFlightRequests {
      * @param node The node the request was sent to
      * @return The request
      */
-    public ClientRequest completeLastSent(int node) {
+    public ClientRequest completeLastSent(String node) {
         return requestQueue(node).pollFirst();
     }
 
@@ -82,7 +82,7 @@ final class InFlightRequests {
      * @param node Node in question
      * @return true iff we have no requests still being sent to the given node
      */
-    public boolean canSendMore(int node) {
+    public boolean canSendMore(String node) {
         Deque<ClientRequest> queue = requests.get(node);
         return queue == null || queue.isEmpty() ||
                (queue.peekFirst().request().completed() && queue.size() < 
this.maxInFlightRequestsPerConnection);
@@ -93,7 +93,7 @@ final class InFlightRequests {
      * @param node The node
      * @return The request count.
      */
-    public int inFlightRequestCount(int node) {
+    public int inFlightRequestCount(String node) {
         Deque<ClientRequest> queue = requests.get(node);
         return queue == null ? 0 : queue.size();
     }
@@ -114,7 +114,7 @@ final class InFlightRequests {
      * @param node The node
      * @return All the in-flight requests for that node that have been removed
      */
-    public Iterable<ClientRequest> clearAll(int node) {
+    public Iterable<ClientRequest> clearAll(String node) {
         Deque<ClientRequest> reqs = requests.get(node);
         if (reqs == null) {
             return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 1311f85..7ab2503 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -81,13 +81,13 @@ public interface KafkaClient extends Closeable {
     public List<ClientResponse> poll(long timeout, long now);
 
     /**
-     * Complete all in-flight requests for a given node
+     * Complete all in-flight requests for a given connection
      * 
-     * @param node The node to complete requests for
+     * @param id The connection to complete requests for
      * @param now The current time in ms
      * @return All requests that complete during this time period.
      */
-    public List<ClientResponse> completeAll(int node, long now);
+    public List<ClientResponse> completeAll(String id, long now);
 
     /**
      * Complete all in-flight requests
@@ -117,7 +117,7 @@ public interface KafkaClient extends Closeable {
      * 
      * @param nodeId The id of the node
      */
-    public int inFlightRequestCount(int nodeId);
+    public int inFlightRequestCount(String nodeId);
 
     /**
      * Generate a request header for the next request

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 435fbb5..48fe796 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -22,8 +22,8 @@ import java.util.Set;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -111,7 +111,7 @@ public class NetworkClient implements KafkaClient {
         if (isReady(node, now))
             return true;
 
-        if (connectionStates.canConnect(node.id(), now))
+        if (connectionStates.canConnect(node.idString(), now))
             // if we are interested in sending to a node and we don't have a 
connection to it, initiate one
             initiateConnect(node, now);
 
@@ -129,7 +129,7 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public long connectionDelay(Node node, long now) {
-        return connectionStates.connectionDelay(node.id(), now);
+        return connectionStates.connectionDelay(node.idString(), now);
     }
 
     /**
@@ -142,7 +142,7 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public boolean connectionFailed(Node node) {
-        return 
connectionStates.connectionState(node.id()).equals(ConnectionState.DISCONNECTED);
+        return 
connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
     }
 
     /**
@@ -154,7 +154,7 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public boolean isReady(Node node, long now) {
-        int nodeId = node.id();
+        String nodeId = node.idString();
         if (!this.metadataFetchInProgress && 
this.metadata.timeToNextUpdate(now) == 0)
             // if we need to update our metadata now declare all requests 
unready to make metadata requests first
             // priority
@@ -165,11 +165,11 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Are we connected and ready and able to send more requests to the given 
node?
+     * Are we connected and ready and able to send more requests to the given 
connection?
      * 
      * @param node The node
      */
-    private boolean isSendable(int node) {
+    private boolean isSendable(String node) {
         return connectionStates.isConnected(node) && 
inFlightRequests.canSendMore(node);
     }
 
@@ -179,7 +179,7 @@ public class NetworkClient implements KafkaClient {
      * @param node The node to check
      * @return The connection state
      */
-    public ConnectionState connectionState(int node) {
+    public ConnectionState connectionState(String node) {
         return connectionStates.connectionState(node);
     }
 
@@ -190,7 +190,7 @@ public class NetworkClient implements KafkaClient {
      */
     @Override
     public void send(ClientRequest request) {
-        int nodeId = request.request().destination();
+        String nodeId = request.request().destination();
         if (!isSendable(nodeId))
             throw new IllegalStateException("Attempt to send a request to node 
" + nodeId + " which is not ready.");
 
@@ -252,7 +252,7 @@ public class NetworkClient implements KafkaClient {
      * @return All the collected responses
      */
     @Override
-    public List<ClientResponse> completeAll(int node, long now) {
+    public List<ClientResponse> completeAll(String node, long now) {
         try {
             this.selector.muteAll();
             this.selector.unmute(node);
@@ -288,8 +288,8 @@ public class NetworkClient implements KafkaClient {
      * Get the number of in-flight requests for a given node
      */
     @Override
-    public int inFlightRequestCount(int nodeId) {
-        return this.inFlightRequests.inFlightRequestCount(nodeId);
+    public int inFlightRequestCount(String node) {
+        return this.inFlightRequests.inFlightRequestCount(node);
     }
 
     /**
@@ -334,11 +334,11 @@ public class NetworkClient implements KafkaClient {
         for (int i = 0; i < nodes.size(); i++) {
             int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
             Node node = nodes.get(idx);
-            int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.id());
-            if (currInflight == 0 && 
this.connectionStates.isConnected(node.id())) {
+            int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.idString());
+            if (currInflight == 0 && 
this.connectionStates.isConnected(node.idString())) {
                 // if we find an established connection with no in-flight 
requests we can stop right away
                 return node;
-            } else if (!this.connectionStates.isBlackedOut(node.id(), now) && 
currInflight < inflight) {
+            } else if (!this.connectionStates.isBlackedOut(node.idString(), 
now) && currInflight < inflight) {
                 // otherwise if this is the best we have found so far, record 
that
                 inflight = currInflight;
                 found = node;
@@ -355,7 +355,7 @@ public class NetworkClient implements KafkaClient {
      */
     private void handleCompletedSends(List<ClientResponse> responses, long 
now) {
         // if no response is expected then when the send is completed, return 
it
-        for (NetworkSend send : this.selector.completedSends()) {
+        for (Send send : this.selector.completedSends()) {
             ClientRequest request = 
this.inFlightRequests.lastSent(send.destination());
             if (!request.expectResponse()) {
                 this.inFlightRequests.completeLastSent(send.destination());
@@ -372,7 +372,7 @@ public class NetworkClient implements KafkaClient {
      */
     private void handleCompletedReceives(List<ClientResponse> responses, long 
now) {
         for (NetworkReceive receive : this.selector.completedReceives()) {
-            int source = receive.source();
+            String source = receive.source();
             ClientRequest req = inFlightRequests.completeNext(source);
             ResponseHeader header = ResponseHeader.parse(receive.payload());
             short apiKey = req.request().header().apiKey();
@@ -412,7 +412,7 @@ public class NetworkClient implements KafkaClient {
      * @param now The current time
      */
     private void handleDisconnections(List<ClientResponse> responses, long 
now) {
-        for (int node : this.selector.disconnected()) {
+        for (String node : this.selector.disconnected()) {
             connectionStates.disconnected(node);
             log.debug("Node {} disconnected.", node);
             for (ClientRequest request : this.inFlightRequests.clearAll(node)) 
{
@@ -433,9 +433,9 @@ public class NetworkClient implements KafkaClient {
      * Record any newly completed connections
      */
     private void handleConnections() {
-        for (Integer id : this.selector.connected()) {
-            log.debug("Completed connection to node {}", id);
-            this.connectionStates.connected(id);
+        for (String node : this.selector.connected()) {
+            log.debug("Completed connection to node {}", node);
+            this.connectionStates.connected(node);
         }
     }
 
@@ -451,7 +451,7 @@ public class NetworkClient implements KafkaClient {
     /**
      * Create a metadata request for the given topics
      */
-    private ClientRequest metadataRequest(long now, int node, Set<String> 
topics) {
+    private ClientRequest metadataRequest(long now, String node, Set<String> 
topics) {
         MetadataRequest metadata = new MetadataRequest(new 
ArrayList<String>(topics));
         RequestSend send = new RequestSend(node, 
nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
         return new ClientRequest(now, true, send, null);
@@ -470,15 +470,17 @@ public class NetworkClient implements KafkaClient {
             this.lastNoNodeAvailableMs = now;
             return;
         }
+        String nodeConnectionId = node.idString();
 
-        if (connectionStates.isConnected(node.id()) && 
inFlightRequests.canSendMore(node.id())) {
+
+        if (connectionStates.isConnected(nodeConnectionId) && 
inFlightRequests.canSendMore(nodeConnectionId)) {
             Set<String> topics = metadata.topics();
             this.metadataFetchInProgress = true;
-            ClientRequest metadataRequest = metadataRequest(now, node.id(), 
topics);
+            ClientRequest metadataRequest = metadataRequest(now, 
nodeConnectionId, topics);
             log.debug("Sending metadata request {} to node {}", 
metadataRequest, node.id());
             this.selector.send(metadataRequest.request());
             this.inFlightRequests.add(metadataRequest);
-        } else if (connectionStates.canConnect(node.id(), now)) {
+        } else if (connectionStates.canConnect(nodeConnectionId, now)) {
             // we don't have a connection to this node right now, make one
             log.debug("Initialize connection to node {} for sending metadata 
request", node.id());
             initiateConnect(node, now);
@@ -497,16 +499,17 @@ public class NetworkClient implements KafkaClient {
      * Initiate a connection to the given node
      */
     private void initiateConnect(Node node, long now) {
+        String nodeConnectionId = node.idString();
         try {
             log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
-            this.connectionStates.connecting(node.id(), now);
-            selector.connect(node.id(),
+            this.connectionStates.connecting(nodeConnectionId, now);
+            selector.connect(nodeConnectionId,
                              new InetSocketAddress(node.host(), node.port()),
                              this.socketSendBuffer,
                              this.socketReceiveBuffer);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
-            connectionStates.disconnected(node.id());
+            connectionStates.disconnected(nodeConnectionId);
             /* maybe the problem is our metadata, update it */
             metadata.requestUpdate();
             log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index bdff518..1e90524 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -156,6 +156,9 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = 
"value.deserializer";
     private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer 
class for value that implements the <code>Deserializer</code> interface.";
 
+    /** <code>connections.max.idle.ms</code> */
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = 
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -277,7 +280,13 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(VALUE_DESERIALIZER_CLASS_CONFIG,
                                         Type.CLASS,
                                         Importance.HIGH,
-                                        VALUE_DESERIALIZER_CLASS_DOC);
+                                        VALUE_DESERIALIZER_CLASS_DOC)
+                                /* default is set to be a bit lower than the 
server default (10 min), to avoid both client and server closing connection at 
same time */
+                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                                        Type.LONG,
+                                        9 * 60 * 1000,
+                                        Importance.MEDIUM,
+                                        
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
     }
 
     public static Map<String, Object> addDeserializerToConfig(Map<String, 
Object> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d301be4..d1d1ec1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -472,7 +472,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             String metricGrpPrefix = "consumer";
             Map<String, String> metricsTags = new LinkedHashMap<String, 
String>();
             metricsTags.put("client-id", clientId);
-            this.client = new NetworkClient(new Selector(metrics, time, 
metricGrpPrefix, metricsTags),
+            this.client = new NetworkClient(
+                    new 
Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
metrics, time, metricGrpPrefix, metricsTags),
                     this.metadata,
                     clientId,
                     100, // a fixed large enough value will suffice

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index fac7995..c1496a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -445,7 +445,7 @@ public final class Coordinator {
         log.debug("Issuing consumer metadata request to broker {}", node.id());
 
         ConsumerMetadataRequest request = new 
ConsumerMetadataRequest(this.groupId);
-        RequestSend send = new RequestSend(node.id(),
+        RequestSend send = new RequestSend(node.idString(),
             this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
             request.toStruct());
         long now = time.milliseconds();
@@ -464,7 +464,7 @@ public final class Coordinator {
         log.debug("Issuing request ({}: {}) to coordinator {}", api, request, 
this.consumerCoordinator.id());
 
         RequestHeader header = this.client.nextRequestHeader(api);
-        RequestSend send = new RequestSend(this.consumerCoordinator.id(), 
header, request);
+        RequestSend send = new 
RequestSend(this.consumerCoordinator.idString(), header, request);
         return new ClientRequest(now, true, send, handler);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c5e577f..56281ee 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -124,7 +124,7 @@ public class Fetcher<K, V> {
      */
     public void initFetches(Cluster cluster, long now) {
         for (ClientRequest request : createFetchRequests(cluster)) {
-            Node node = cluster.nodeById(request.request().destination());
+            Node node = 
cluster.nodeById(Integer.parseInt(request.request().destination()));
             if (client.ready(node, now)) {
                 log.trace("Initiating fetch to node {}: {}", node.id(), 
request);
                 client.send(request);
@@ -209,12 +209,12 @@ public class Fetcher<K, V> {
             } else if (this.client.ready(info.leader(), now)) {
                 Node node = info.leader();
                 ListOffsetRequest request = new ListOffsetRequest(-1, 
partitions);
-                RequestSend send = new RequestSend(node.id(),
+                RequestSend send = new RequestSend(node.idString(),
                     this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS),
                     request.toStruct());
                 ClientRequest clientRequest = new ClientRequest(now, true, 
send, null);
                 this.client.send(clientRequest);
-                List<ClientResponse> responses = 
this.client.completeAll(node.id(), now);
+                List<ClientResponse> responses = 
this.client.completeAll(node.idString(), now);
                 if (responses.isEmpty())
                     throw new IllegalStateException("This should not happen.");
                 ClientResponse response = responses.get(responses.size() - 1);
@@ -258,7 +258,7 @@ public class Fetcher<K, V> {
         for (TopicPartition partition : subscriptions.assignedPartitions()) {
             Node node = cluster.leaderFor(partition);
             // if there is a leader and no in-flight requests, issue a new 
fetch
-            if (node != null && this.client.inFlightRequestCount(node.id()) == 
0) {
+            if (node != null && 
this.client.inFlightRequestCount(node.idString()) == 0) {
                 Map<TopicPartition, FetchRequest.PartitionData> fetch = 
fetchable.get(node.id());
                 if (fetch == null) {
                     fetch = new HashMap<TopicPartition, 
FetchRequest.PartitionData>();
@@ -274,7 +274,7 @@ public class Fetcher<K, V> {
         for (Map.Entry<Integer, Map<TopicPartition, 
FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
             int nodeId = entry.getKey();
             final FetchRequest fetch = new FetchRequest(this.maxWaitMs, 
this.minBytes, entry.getValue());
-            RequestSend send = new RequestSend(nodeId, 
this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
+            RequestSend send = new RequestSend(Integer.toString(nodeId), 
this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
             RequestCompletionHandler handler = new RequestCompletionHandler() {
                 public void onComplete(ClientResponse response) {
                     handleFetchResponse(response, fetch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ded19d8..5a37580 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -227,7 +227,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 
time.milliseconds());
 
-            NetworkClient client = new NetworkClient(new 
Selector(this.metrics, time, "producer", metricTags),
+            NetworkClient client = new NetworkClient(
+                    new 
Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
this.metrics, time, "producer", metricTags),
                     this.metadata,
                     clientId,
                     
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 023bd2e..aa26420 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -169,11 +169,13 @@ public class ProducerConfig extends AbstractConfig {
     public static final String VALUE_SERIALIZER_CLASS_CONFIG = 
"value.serializer";
     private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class 
for value that implements the <code>Serializer</code> interface.";
 
+    /** <code>connections.max.idle.ms</code> */
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = 
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+
     /** <code>partitioner.class</code> */
     public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
     private static final String PARTITIONER_CLASS_DOC = "Partitioner class 
that implements the <code>Partitioner</code> interface.";
 
-
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 
1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -223,6 +225,8 @@ public class ProducerConfig extends AbstractConfig {
                                         
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
                                 .define(KEY_SERIALIZER_CLASS_CONFIG, 
Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
                                 .define(VALUE_SERIALIZER_CLASS_CONFIG, 
Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
+                                /* default is set to be a bit lower than the 
server default (10 min), to avoid both client and server closing connection at 
same time */
+                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, 
Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, 
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, 
"org.apache.kafka.clients.producer.internals.DefaultPartitioner", 
Importance.MEDIUM, PARTITIONER_CLASS_DOC);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 1e943d6..07e65d4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -320,7 +320,7 @@ public class Sender implements Runnable {
             recordsByPartition.put(tp, batch);
         }
         ProduceRequest request = new ProduceRequest(acks, timeout, 
produceRecordsByPartition);
-        RequestSend send = new RequestSend(destination,
+        RequestSend send = new RequestSend(Integer.toString(destination),
                                            
this.client.nextRequestHeader(ApiKeys.PRODUCE),
                                            request.toStruct());
         RequestCompletionHandler callback = new RequestCompletionHandler() {
@@ -505,10 +505,10 @@ public class Sender implements Runnable {
                 topicErrorSensor.record(count, now);
         }
 
-        public void recordLatency(int node, long latency) {
+        public void recordLatency(String node, long latency) {
             long now = time.milliseconds();
             this.requestTimeSensor.record(latency, now);
-            if (node >= 0) {
+            if (!node.isEmpty()) {
                 String nodeTimeName = "node-" + node + ".latency";
                 Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
                 if (nodeRequestTime != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java 
b/clients/src/main/java/org/apache/kafka/common/Node.java
index f4e4186..644cd71 100644
--- a/clients/src/main/java/org/apache/kafka/common/Node.java
+++ b/clients/src/main/java/org/apache/kafka/common/Node.java
@@ -18,12 +18,14 @@ package org.apache.kafka.common;
 public class Node {
 
     private final int id;
+    private final String idString;
     private final String host;
     private final int port;
 
     public Node(int id, String host, int port) {
         super();
         this.id = id;
+        this.idString = Integer.toString(id);
         this.host = host;
         this.port = port;
     }
@@ -40,6 +42,14 @@ public class Node {
     }
 
     /**
+     * String representation of the node id.
+     * Typically the integer id is used to serialize over the wire, the string 
representation is used as an identifier with NetworkClient code
+     */
+    public String idString() {
+        return idString;
+    }
+
+    /**
      * The host name for this node
      */
     public String host() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
index 129ae82..159c301 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java
@@ -25,11 +25,11 @@ import java.nio.channels.ScatteringByteChannel;
  */
 public class ByteBufferReceive implements Receive {
 
-    private final int source;
+    private final String source;
     private final ByteBuffer[] buffers;
     private int remaining;
 
-    public ByteBufferReceive(int source, ByteBuffer... buffers) {
+    public ByteBufferReceive(String source, ByteBuffer... buffers) {
         super();
         this.source = source;
         this.buffers = buffers;
@@ -38,7 +38,7 @@ public class ByteBufferReceive implements Receive {
     }
 
     @Override
-    public int source() {
+    public String source() {
         return source;
     }
 
@@ -54,8 +54,4 @@ public class ByteBufferReceive implements Receive {
         return read;
     }
 
-    public ByteBuffer[] reify() {
-        return buffers;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index c8213e1..df0e6d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -22,12 +22,12 @@ import java.nio.channels.GatheringByteChannel;
  */
 public class ByteBufferSend implements Send {
 
-    private final int destination;
+    private final String destination;
     protected final ByteBuffer[] buffers;
     private int remaining;
     private int size;
 
-    public ByteBufferSend(int destination, ByteBuffer... buffers) {
+    public ByteBufferSend(String destination, ByteBuffer... buffers) {
         super();
         this.destination = destination;
         this.buffers = buffers;
@@ -37,7 +37,7 @@ public class ByteBufferSend implements Send {
     }
 
     @Override
-    public int destination() {
+    public String destination() {
         return destination;
     }
 
@@ -47,16 +47,7 @@ public class ByteBufferSend implements Send {
     }
 
     @Override
-    public ByteBuffer[] reify() {
-        return this.buffers;
-    }
-
-    @Override
-    public int remaining() {
-        return this.remaining;
-    }
-
-    public int size() {
+    public long size() {
         return this.size;
     }
 
@@ -64,9 +55,8 @@ public class ByteBufferSend implements Send {
     public long writeTo(GatheringByteChannel channel) throws IOException {
         long written = channel.write(buffers);
         if (written < 0)
-            throw new EOFException("This shouldn't happen.");
+            throw new EOFException("Wrote negative bytes to channel. This 
shouldn't happen.");
         remaining -= written;
         return written;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java
 
b/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java
new file mode 100644
index 0000000..a5bdd62
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.KafkaException;
+
+public class InvalidReceiveException extends KafkaException {
+
+    public InvalidReceiveException(String message) {
+        super(message);
+    }
+
+    public InvalidReceiveException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
new file mode 100644
index 0000000..0e14a39
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A set of composite sends, sent one after another
+ */
+
+public class MultiSend implements Send {
+
+    private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
+    private String dest;
+    private long totalWritten = 0;
+    private List<Send> sends;
+    private Iterator<Send> sendsIterator;
+    private Send current;
+    private boolean doneSends = false;
+    private long size = 0;
+
+    public MultiSend(String dest, List<Send> sends) {
+        this.dest = dest;
+        this.sends = sends;
+        this.sendsIterator = sends.iterator();
+        nextSendOrDone();
+        for (Send send: sends)
+            this.size += send.size();
+    }
+
+    @Override
+    public long size() {
+        return size;
+    }
+
+    @Override
+    public String destination() {
+        return dest;
+    }
+
+    @Override
+    public boolean completed() {
+        if (doneSends) {
+            if (totalWritten != size)
+                log.error("mismatch in sending bytes over socket; expected: " 
+ size + " actual: " + totalWritten);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public long writeTo(GatheringByteChannel channel) throws IOException {
+        if (completed())
+            throw new KafkaException("This operation cannot be completed on a 
complete request.");
+
+        int totalWrittenPerCall = 0;
+        boolean sendComplete = false;
+        do {
+            long written = current.writeTo(channel);
+            totalWritten += written;
+            totalWrittenPerCall += written;
+            sendComplete = current.completed();
+            if (sendComplete)
+                nextSendOrDone();
+        } while (!completed() && sendComplete);
+        if (log.isTraceEnabled())
+            log.trace("Bytes written as part of multisend call : " + 
totalWrittenPerCall +  "Total bytes written so far : " + totalWritten + 
"Expected bytes to write : " + size);
+        return totalWrittenPerCall;
+    }
+
+    // update current if there's a next Send, mark sends as done if there isn't
+    private void nextSendOrDone() {
+        if (sendsIterator.hasNext())
+            current = sendsIterator.next();
+        else
+            doneSends = true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index fc0d168..3ca0098 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -15,6 +15,7 @@ package org.apache.kafka.common.network;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 
 /**
@@ -22,24 +23,42 @@ import java.nio.channels.ScatteringByteChannel;
  */
 public class NetworkReceive implements Receive {
 
-    private final int source;
+    public final static String UNKNOWN_SOURCE = "";
+    public final static int UNLIMITED = -1;
+
+    private final String source;
     private final ByteBuffer size;
+    private final int maxSize;
     private ByteBuffer buffer;
 
-    public NetworkReceive(int source, ByteBuffer buffer) {
+
+    public NetworkReceive(String source, ByteBuffer buffer) {
         this.source = source;
         this.buffer = buffer;
         this.size = null;
+        this.maxSize = UNLIMITED;
+    }
+
+    public NetworkReceive(String source) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = UNLIMITED;
     }
 
-    public NetworkReceive(int source) {
+    public NetworkReceive(int maxSize, String source) {
         this.source = source;
         this.size = ByteBuffer.allocate(4);
         this.buffer = null;
+        this.maxSize = maxSize;
+    }
+
+    public NetworkReceive() {
+        this(UNKNOWN_SOURCE);
     }
 
     @Override
-    public int source() {
+    public String source() {
         return source;
     }
 
@@ -48,13 +67,15 @@ public class NetworkReceive implements Receive {
         return !size.hasRemaining() && !buffer.hasRemaining();
     }
 
-    @Override
-    public ByteBuffer[] reify() {
-        return new ByteBuffer[] {this.buffer};
+    public long readFrom(ScatteringByteChannel channel) throws IOException {
+        return readFromReadableChannel(channel);
     }
 
-    @Override
-    public long readFrom(ScatteringByteChannel channel) throws IOException {
+    // Need a method to read from ReadableByteChannel because BlockingChannel 
requires read with timeout
+    // See: 
http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
+    // This can go away after we get rid of BlockingChannel
+    @Deprecated
+    public long readFromReadableChannel(ReadableByteChannel channel) throws 
IOException {
         int read = 0;
         if (size.hasRemaining()) {
             int bytesRead = channel.read(size);
@@ -63,10 +84,12 @@ public class NetworkReceive implements Receive {
             read += bytesRead;
             if (!size.hasRemaining()) {
                 size.rewind();
-                int requestSize = size.getInt();
-                if (requestSize < 0)
-                    throw new IllegalStateException("Invalid request (size = " 
+ requestSize + ")");
-                this.buffer = ByteBuffer.allocate(requestSize);
+                int receiveSize = size.getInt();
+                if (receiveSize < 0)
+                    throw new InvalidReceiveException("Invalid receive (size = 
" + receiveSize + ")");
+                if (maxSize != UNLIMITED && receiveSize > maxSize)
+                    throw new InvalidReceiveException("Invalid receive (size = 
" + receiveSize + " larger than " + maxSize + ")");
+                this.buffer = ByteBuffer.allocate(receiveSize);
             }
         }
         if (buffer != null) {
@@ -83,4 +106,14 @@ public class NetworkReceive implements Receive {
         return this.buffer;
     }
 
+    // Used only by BlockingChannel, so we may be able to get rid of this 
when/if we get rid of BlockingChannel
+    @Deprecated
+    public long readCompletely(ReadableByteChannel channel) throws IOException 
{
+        int totalRead = 0;
+        while (!complete()) {
+            totalRead += readFromReadableChannel(channel);
+        }
+        return totalRead;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
index 68327cd..49964b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer;
  */
 public class NetworkSend extends ByteBufferSend {
 
-    public NetworkSend(int destination, ByteBuffer... buffers) {
+    public NetworkSend(String destination, ByteBuffer... buffers) {
         super(destination, sizeDelimit(buffers));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Receive.java 
b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
index 4e33078..4b14431 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Receive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.network;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ScatteringByteChannel;
 
 /**
@@ -28,7 +27,7 @@ public interface Receive {
     /**
      * The numeric id of the source from which we are receiving data.
      */
-    public int source();
+    public String source();
 
     /**
      * Are we done receiving data?
@@ -36,11 +35,6 @@ public interface Receive {
     public boolean complete();
 
     /**
-     * Turn this receive into ByteBuffer instances, if possible (otherwise 
returns null).
-     */
-    public ByteBuffer[] reify();
-
-    /**
      * Read bytes into this receive from the given channel
      * @param channel The channel to read from
      * @return The number of bytes read

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index b5f8d83..618a0fa 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -29,12 +29,12 @@ public interface Selectable {
      * @param receiveBufferSize The receive buffer for the socket
      * @throws IOException If we cannot begin connecting
      */
-    public void connect(int id, InetSocketAddress address, int sendBufferSize, 
int receiveBufferSize) throws IOException;
+    public void connect(String id, InetSocketAddress address, int 
sendBufferSize, int receiveBufferSize) throws IOException;
 
     /**
      * Begin disconnecting the connection identified by the given id
      */
-    public void disconnect(int id);
+    public void disconnect(String id);
 
     /**
      * Wakeup this selector if it is blocked on I/O
@@ -50,7 +50,7 @@ public interface Selectable {
      * Queue the given request for sending in the subsequent {@poll(long)} 
calls
      * @param send The request to send
      */
-    public void send(NetworkSend send);
+    public void send(Send send);
 
     /**
      * Do I/O. Reads, writes, connection establishment, etc.
@@ -62,7 +62,7 @@ public interface Selectable {
     /**
      * The list of sends that completed on the last {@link #poll(long, List) 
poll()} call.
      */
-    public List<NetworkSend> completedSends();
+    public List<Send> completedSends();
 
     /**
      * The list of receives that completed on the last {@link #poll(long, 
List) poll()} call.
@@ -73,25 +73,25 @@ public interface Selectable {
      * The list of connections that finished disconnecting on the last {@link 
#poll(long, List) poll()}
      * call.
      */
-    public List<Integer> disconnected();
+    public List<String> disconnected();
 
     /**
      * The list of connections that completed their connection on the last 
{@link #poll(long, List) poll()}
      * call.
      */
-    public List<Integer> connected();
+    public List<String> connected();
 
     /**
      * Disable reads from the given connection
      * @param id The id for the connection
      */
-    public void mute(int id);
+    public void mute(String id);
 
     /**
      * Re-enable reads from the given connection
      * @param id The id for the connection
      */
-    public void unmute(int id);
+    public void unmute(String id);
 
     /**
      * Disable reads from all connections

Reply via email to