kafka-1928; Move kafka.network over to using the network classes in org.apache.kafka.common.network; patched by Gwen Shapira; reviewed by Joel Koshy, Jay Kreps, Jiangjie Qin, Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78ba492e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78ba492e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78ba492e Branch: refs/heads/trunk Commit: 78ba492e3e70fd9db61bc82469371d04a8d6b762 Parents: d22987f Author: Gwen Shapira <csh...@gmail.com> Authored: Wed Jun 3 21:40:35 2015 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Jun 3 21:40:35 2015 -0700 ---------------------------------------------------------------------- .../kafka/clients/ClusterConnectionStates.java | 78 ++--- .../kafka/clients/CommonClientConfigs.java | 2 + .../apache/kafka/clients/InFlightRequests.java | 18 +- .../org/apache/kafka/clients/KafkaClient.java | 8 +- .../org/apache/kafka/clients/NetworkClient.java | 59 ++-- .../kafka/clients/consumer/ConsumerConfig.java | 11 +- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../clients/consumer/internals/Coordinator.java | 4 +- .../clients/consumer/internals/Fetcher.java | 10 +- .../kafka/clients/producer/KafkaProducer.java | 3 +- .../kafka/clients/producer/ProducerConfig.java | 6 +- .../clients/producer/internals/Sender.java | 6 +- .../main/java/org/apache/kafka/common/Node.java | 10 + .../kafka/common/network/ByteBufferReceive.java | 10 +- .../kafka/common/network/ByteBufferSend.java | 20 +- .../common/network/InvalidReceiveException.java | 30 ++ .../apache/kafka/common/network/MultiSend.java | 100 ++++++ .../kafka/common/network/NetworkReceive.java | 59 +++- .../kafka/common/network/NetworkSend.java | 2 +- .../apache/kafka/common/network/Receive.java | 8 +- .../apache/kafka/common/network/Selectable.java | 16 +- .../apache/kafka/common/network/Selector.java | 230 ++++++++----- .../org/apache/kafka/common/network/Send.java | 18 +- .../kafka/common/requests/RequestSend.java | 2 +- .../kafka/common/requests/ResponseSend.java | 41 +++ .../org/apache/kafka/clients/MockClient.java | 6 +- .../apache/kafka/clients/NetworkClientTest.java | 8 +- .../kafka/common/network/SelectorTest.java | 86 ++--- .../org/apache/kafka/test/MockSelector.java | 25 +- core/src/main/scala/kafka/Kafka.scala | 12 +- .../kafka/admin/ConsumerGroupCommand.scala | 2 +- .../kafka/api/ConsumerMetadataRequest.scala | 7 +- .../kafka/api/ControlledShutdownRequest.scala | 9 +- .../src/main/scala/kafka/api/FetchRequest.scala | 2 +- .../main/scala/kafka/api/FetchResponse.scala | 73 ++-- .../scala/kafka/api/LeaderAndIsrRequest.scala | 12 +- .../scala/kafka/api/OffsetCommitRequest.scala | 10 +- .../scala/kafka/api/OffsetFetchRequest.scala | 15 +- .../main/scala/kafka/api/OffsetRequest.scala | 7 +- .../main/scala/kafka/api/ProducerRequest.scala | 7 +- core/src/main/scala/kafka/api/RequestKeys.scala | 4 +- .../scala/kafka/api/StopReplicaRequest.scala | 4 +- .../scala/kafka/api/TopicMetadataRequest.scala | 8 +- .../scala/kafka/api/UpdateMetadataRequest.scala | 4 +- .../main/scala/kafka/client/ClientUtils.scala | 2 +- .../scala/kafka/consumer/SimpleConsumer.scala | 19 +- .../consumer/ZookeeperConsumerConnector.scala | 4 +- .../controller/ControllerChannelManager.scala | 11 +- .../kafka/javaapi/TopicMetadataRequest.scala | 7 +- .../scala/kafka/network/BlockingChannel.scala | 21 +- .../network/BoundedByteBufferReceive.scala | 90 ----- .../kafka/network/BoundedByteBufferSend.scala | 71 ---- .../scala/kafka/network/ByteBufferSend.scala | 40 --- core/src/main/scala/kafka/network/Handler.scala | 6 +- .../scala/kafka/network/RequestChannel.scala | 35 +- .../kafka/network/RequestOrResponseSend.scala | 57 ++++ .../main/scala/kafka/network/SocketServer.scala | 334 ++++++++----------- .../main/scala/kafka/network/Transmission.scala | 122 ------- .../scala/kafka/producer/SyncProducer.scala | 19 +- .../src/main/scala/kafka/server/KafkaApis.scala | 44 +-- .../main/scala/kafka/server/KafkaConfig.scala | 56 +++- .../main/scala/kafka/server/KafkaServer.scala | 33 +- .../scala/kafka/server/MessageSetSend.scala | 71 ---- .../kafka/tools/ConsumerOffsetChecker.scala | 2 +- .../scala/other/kafka/TestOffsetManager.scala | 6 +- .../test/scala/unit/kafka/KafkaConfigTest.scala | 17 +- .../unit/kafka/network/SocketServerTest.scala | 41 +-- .../kafka/server/KafkaConfigConfigDefTest.scala | 8 + 68 files changed, 1075 insertions(+), 1096 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index da76cc2..9ebda5e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -21,22 +21,22 @@ import java.util.Map; */ final class ClusterConnectionStates { private final long reconnectBackoffMs; - private final Map<Integer, NodeConnectionState> nodeState; + private final Map<String, NodeConnectionState> nodeState; public ClusterConnectionStates(long reconnectBackoffMs) { this.reconnectBackoffMs = reconnectBackoffMs; - this.nodeState = new HashMap<Integer, NodeConnectionState>(); + this.nodeState = new HashMap<String, NodeConnectionState>(); } /** - * Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not + * Return true iff we can currently initiate a new connection. This will be the case if we are not * connected and haven't been connected for at least the minimum reconnection backoff period. - * @param node The node id to check + * @param id The connection id to check * @param now The current time in MS * @return true if we can initiate a new connection */ - public boolean canConnect(int node, long now) { - NodeConnectionState state = nodeState.get(node); + public boolean canConnect(String id, long now) { + NodeConnectionState state = nodeState.get(id); if (state == null) return true; else @@ -45,11 +45,11 @@ final class ClusterConnectionStates { /** * Return true if we are disconnected from the given node and can't re-establish a connection yet - * @param node The node to check + * @param id The connection to check * @param now The current time in ms */ - public boolean isBlackedOut(int node, long now) { - NodeConnectionState state = nodeState.get(node); + public boolean isBlackedOut(String id, long now) { + NodeConnectionState state = nodeState.get(id); if (state == null) return false; else @@ -60,11 +60,11 @@ final class ClusterConnectionStates { * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled * connections. - * @param node The node to check + * @param id The connection to check * @param now The current time in ms */ - public long connectionDelay(int node, long now) { - NodeConnectionState state = nodeState.get(node); + public long connectionDelay(String id, long now) { + NodeConnectionState state = nodeState.get(id); if (state == null) return 0; long timeWaited = now - state.lastConnectAttemptMs; if (state.state == ConnectionState.DISCONNECTED) { @@ -77,67 +77,67 @@ final class ClusterConnectionStates { } /** - * Enter the connecting state for the given node. - * @param node The id of the node we are connecting to + * Enter the connecting state for the given connection. + * @param id The id of the connection * @param now The current time. */ - public void connecting(int node, long now) { - nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); + public void connecting(String id, long now) { + nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now)); } /** - * Return true iff we have a connection to the give node - * @param node The id of the node to check + * Return true iff a specific connection is connected + * @param id The id of the connection to check */ - public boolean isConnected(int node) { - NodeConnectionState state = nodeState.get(node); + public boolean isConnected(String id) { + NodeConnectionState state = nodeState.get(id); return state != null && state.state == ConnectionState.CONNECTED; } /** - * Return true iff we are in the process of connecting to the given node - * @param node The id of the node + * Return true iff we are in the process of connecting + * @param id The id of the connection */ - public boolean isConnecting(int node) { - NodeConnectionState state = nodeState.get(node); + public boolean isConnecting(String id) { + NodeConnectionState state = nodeState.get(id); return state != null && state.state == ConnectionState.CONNECTING; } /** - * Enter the connected state for the given node - * @param node The node we have connected to + * Enter the connected state for the given connection + * @param id The connection identifier */ - public void connected(int node) { - NodeConnectionState nodeState = nodeState(node); + public void connected(String id) { + NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.CONNECTED; } /** * Enter the disconnected state for the given node - * @param node The node we have disconnected from + * @param id The connection we have disconnected */ - public void disconnected(int node) { - NodeConnectionState nodeState = nodeState(node); + public void disconnected(String id) { + NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.DISCONNECTED; } /** - * Get the state of our connection to the given node - * @param node The id of the node + * Get the state of a given connection + * @param id The id of the connection * @return The state of our connection */ - public ConnectionState connectionState(int node) { - return nodeState(node).state; + public ConnectionState connectionState(String id) { + return nodeState(id).state; } /** * Get the state of a given node - * @param node The node to fetch the state for + * @param id The connection to fetch the state for */ - private NodeConnectionState nodeState(int node) { - NodeConnectionState state = this.nodeState.get(node); + private NodeConnectionState nodeState(String id) { + NodeConnectionState state = this.nodeState.get(id); if (state == null) - throw new IllegalStateException("No entry found for node " + node); + throw new IllegalStateException("No entry found for connection " + id); return state; } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index cf32e4e..2c421f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -55,4 +55,6 @@ public class CommonClientConfigs { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; + public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config."; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 936487b..15d00d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -24,14 +24,14 @@ import java.util.Map; final class InFlightRequests { private final int maxInFlightRequestsPerConnection; - private final Map<Integer, Deque<ClientRequest>> requests = new HashMap<Integer, Deque<ClientRequest>>(); + private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>(); public InFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; } /** - * Add the given request to the queue for the node it was directed to + * Add the given request to the queue for the connection it was directed to */ public void add(ClientRequest request) { Deque<ClientRequest> reqs = this.requests.get(request.request().destination()); @@ -45,7 +45,7 @@ final class InFlightRequests { /** * Get the request queue for the given node */ - private Deque<ClientRequest> requestQueue(int node) { + private Deque<ClientRequest> requestQueue(String node) { Deque<ClientRequest> reqs = requests.get(node); if (reqs == null || reqs.isEmpty()) throw new IllegalStateException("Response from server for which there are no in-flight requests."); @@ -55,7 +55,7 @@ final class InFlightRequests { /** * Get the oldest request (the one that that will be completed next) for the given node */ - public ClientRequest completeNext(int node) { + public ClientRequest completeNext(String node) { return requestQueue(node).pollLast(); } @@ -63,7 +63,7 @@ final class InFlightRequests { * Get the last request we sent to the given node (but don't remove it from the queue) * @param node The node id */ - public ClientRequest lastSent(int node) { + public ClientRequest lastSent(String node) { return requestQueue(node).peekFirst(); } @@ -72,7 +72,7 @@ final class InFlightRequests { * @param node The node the request was sent to * @return The request */ - public ClientRequest completeLastSent(int node) { + public ClientRequest completeLastSent(String node) { return requestQueue(node).pollFirst(); } @@ -82,7 +82,7 @@ final class InFlightRequests { * @param node Node in question * @return true iff we have no requests still being sent to the given node */ - public boolean canSendMore(int node) { + public boolean canSendMore(String node) { Deque<ClientRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); @@ -93,7 +93,7 @@ final class InFlightRequests { * @param node The node * @return The request count. */ - public int inFlightRequestCount(int node) { + public int inFlightRequestCount(String node) { Deque<ClientRequest> queue = requests.get(node); return queue == null ? 0 : queue.size(); } @@ -114,7 +114,7 @@ final class InFlightRequests { * @param node The node * @return All the in-flight requests for that node that have been removed */ - public Iterable<ClientRequest> clearAll(int node) { + public Iterable<ClientRequest> clearAll(String node) { Deque<ClientRequest> reqs = requests.get(node); if (reqs == null) { return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 1311f85..7ab2503 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -81,13 +81,13 @@ public interface KafkaClient extends Closeable { public List<ClientResponse> poll(long timeout, long now); /** - * Complete all in-flight requests for a given node + * Complete all in-flight requests for a given connection * - * @param node The node to complete requests for + * @param id The connection to complete requests for * @param now The current time in ms * @return All requests that complete during this time period. */ - public List<ClientResponse> completeAll(int node, long now); + public List<ClientResponse> completeAll(String id, long now); /** * Complete all in-flight requests @@ -117,7 +117,7 @@ public interface KafkaClient extends Closeable { * * @param nodeId The id of the node */ - public int inFlightRequestCount(int nodeId); + public int inFlightRequestCount(String nodeId); /** * Generate a request header for the next request http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 435fbb5..48fe796 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -22,8 +22,8 @@ import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.network.NetworkReceive; -import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; @@ -111,7 +111,7 @@ public class NetworkClient implements KafkaClient { if (isReady(node, now)) return true; - if (connectionStates.canConnect(node.id(), now)) + if (connectionStates.canConnect(node.idString(), now)) // if we are interested in sending to a node and we don't have a connection to it, initiate one initiateConnect(node, now); @@ -129,7 +129,7 @@ public class NetworkClient implements KafkaClient { */ @Override public long connectionDelay(Node node, long now) { - return connectionStates.connectionDelay(node.id(), now); + return connectionStates.connectionDelay(node.idString(), now); } /** @@ -142,7 +142,7 @@ public class NetworkClient implements KafkaClient { */ @Override public boolean connectionFailed(Node node) { - return connectionStates.connectionState(node.id()).equals(ConnectionState.DISCONNECTED); + return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED); } /** @@ -154,7 +154,7 @@ public class NetworkClient implements KafkaClient { */ @Override public boolean isReady(Node node, long now) { - int nodeId = node.id(); + String nodeId = node.idString(); if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) // if we need to update our metadata now declare all requests unready to make metadata requests first // priority @@ -165,11 +165,11 @@ public class NetworkClient implements KafkaClient { } /** - * Are we connected and ready and able to send more requests to the given node? + * Are we connected and ready and able to send more requests to the given connection? * * @param node The node */ - private boolean isSendable(int node) { + private boolean isSendable(String node) { return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); } @@ -179,7 +179,7 @@ public class NetworkClient implements KafkaClient { * @param node The node to check * @return The connection state */ - public ConnectionState connectionState(int node) { + public ConnectionState connectionState(String node) { return connectionStates.connectionState(node); } @@ -190,7 +190,7 @@ public class NetworkClient implements KafkaClient { */ @Override public void send(ClientRequest request) { - int nodeId = request.request().destination(); + String nodeId = request.request().destination(); if (!isSendable(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); @@ -252,7 +252,7 @@ public class NetworkClient implements KafkaClient { * @return All the collected responses */ @Override - public List<ClientResponse> completeAll(int node, long now) { + public List<ClientResponse> completeAll(String node, long now) { try { this.selector.muteAll(); this.selector.unmute(node); @@ -288,8 +288,8 @@ public class NetworkClient implements KafkaClient { * Get the number of in-flight requests for a given node */ @Override - public int inFlightRequestCount(int nodeId) { - return this.inFlightRequests.inFlightRequestCount(nodeId); + public int inFlightRequestCount(String node) { + return this.inFlightRequests.inFlightRequestCount(node); } /** @@ -334,11 +334,11 @@ public class NetworkClient implements KafkaClient { for (int i = 0; i < nodes.size(); i++) { int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size()); Node node = nodes.get(idx); - int currInflight = this.inFlightRequests.inFlightRequestCount(node.id()); - if (currInflight == 0 && this.connectionStates.isConnected(node.id())) { + int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString()); + if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) { // if we find an established connection with no in-flight requests we can stop right away return node; - } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) { + } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; found = node; @@ -355,7 +355,7 @@ public class NetworkClient implements KafkaClient { */ private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it - for (NetworkSend send : this.selector.completedSends()) { + for (Send send : this.selector.completedSends()) { ClientRequest request = this.inFlightRequests.lastSent(send.destination()); if (!request.expectResponse()) { this.inFlightRequests.completeLastSent(send.destination()); @@ -372,7 +372,7 @@ public class NetworkClient implements KafkaClient { */ private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { - int source = receive.source(); + String source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); ResponseHeader header = ResponseHeader.parse(receive.payload()); short apiKey = req.request().header().apiKey(); @@ -412,7 +412,7 @@ public class NetworkClient implements KafkaClient { * @param now The current time */ private void handleDisconnections(List<ClientResponse> responses, long now) { - for (int node : this.selector.disconnected()) { + for (String node : this.selector.disconnected()) { connectionStates.disconnected(node); log.debug("Node {} disconnected.", node); for (ClientRequest request : this.inFlightRequests.clearAll(node)) { @@ -433,9 +433,9 @@ public class NetworkClient implements KafkaClient { * Record any newly completed connections */ private void handleConnections() { - for (Integer id : this.selector.connected()) { - log.debug("Completed connection to node {}", id); - this.connectionStates.connected(id); + for (String node : this.selector.connected()) { + log.debug("Completed connection to node {}", node); + this.connectionStates.connected(node); } } @@ -451,7 +451,7 @@ public class NetworkClient implements KafkaClient { /** * Create a metadata request for the given topics */ - private ClientRequest metadataRequest(long now, int node, Set<String> topics) { + private ClientRequest metadataRequest(long now, String node, Set<String> topics) { MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics)); RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); return new ClientRequest(now, true, send, null); @@ -470,15 +470,17 @@ public class NetworkClient implements KafkaClient { this.lastNoNodeAvailableMs = now; return; } + String nodeConnectionId = node.idString(); - if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + + if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) { Set<String> topics = metadata.topics(); this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); this.selector.send(metadataRequest.request()); this.inFlightRequests.add(metadataRequest); - } else if (connectionStates.canConnect(node.id(), now)) { + } else if (connectionStates.canConnect(nodeConnectionId, now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); initiateConnect(node, now); @@ -497,16 +499,17 @@ public class NetworkClient implements KafkaClient { * Initiate a connection to the given node */ private void initiateConnect(Node node, long now) { + String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); - this.connectionStates.connecting(node.id(), now); - selector.connect(node.id(), + this.connectionStates.connecting(nodeConnectionId, now); + selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ - connectionStates.disconnected(node.id()); + connectionStates.disconnected(nodeConnectionId); /* maybe the problem is our metadata, update it */ metadata.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index bdff518..1e90524 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -156,6 +156,9 @@ public class ConsumerConfig extends AbstractConfig { public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface."; + /** <code>connections.max.idle.ms</code> */ + public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -277,7 +280,13 @@ public class ConsumerConfig extends AbstractConfig { .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, - VALUE_DESERIALIZER_CLASS_DOC); + VALUE_DESERIALIZER_CLASS_DOC) + /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, + Type.LONG, + 9 * 60 * 1000, + Importance.MEDIUM, + CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC); } public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs, http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d301be4..d1d1ec1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -472,7 +472,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { String metricGrpPrefix = "consumer"; Map<String, String> metricsTags = new LinkedHashMap<String, String>(); metricsTags.put("client-id", clientId); - this.client = new NetworkClient(new Selector(metrics, time, metricGrpPrefix, metricsTags), + this.client = new NetworkClient( + new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags), this.metadata, clientId, 100, // a fixed large enough value will suffice http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index fac7995..c1496a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -445,7 +445,7 @@ public final class Coordinator { log.debug("Issuing consumer metadata request to broker {}", node.id()); ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.groupId); - RequestSend send = new RequestSend(node.id(), + RequestSend send = new RequestSend(node.idString(), this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA), request.toStruct()); long now = time.milliseconds(); @@ -464,7 +464,7 @@ public final class Coordinator { log.debug("Issuing request ({}: {}) to coordinator {}", api, request, this.consumerCoordinator.id()); RequestHeader header = this.client.nextRequestHeader(api); - RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request); + RequestSend send = new RequestSend(this.consumerCoordinator.idString(), header, request); return new ClientRequest(now, true, send, handler); } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index c5e577f..56281ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -124,7 +124,7 @@ public class Fetcher<K, V> { */ public void initFetches(Cluster cluster, long now) { for (ClientRequest request : createFetchRequests(cluster)) { - Node node = cluster.nodeById(request.request().destination()); + Node node = cluster.nodeById(Integer.parseInt(request.request().destination())); if (client.ready(node, now)) { log.trace("Initiating fetch to node {}: {}", node.id(), request); client.send(request); @@ -209,12 +209,12 @@ public class Fetcher<K, V> { } else if (this.client.ready(info.leader(), now)) { Node node = info.leader(); ListOffsetRequest request = new ListOffsetRequest(-1, partitions); - RequestSend send = new RequestSend(node.id(), + RequestSend send = new RequestSend(node.idString(), this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS), request.toStruct()); ClientRequest clientRequest = new ClientRequest(now, true, send, null); this.client.send(clientRequest); - List<ClientResponse> responses = this.client.completeAll(node.id(), now); + List<ClientResponse> responses = this.client.completeAll(node.idString(), now); if (responses.isEmpty()) throw new IllegalStateException("This should not happen."); ClientResponse response = responses.get(responses.size() - 1); @@ -258,7 +258,7 @@ public class Fetcher<K, V> { for (TopicPartition partition : subscriptions.assignedPartitions()) { Node node = cluster.leaderFor(partition); // if there is a leader and no in-flight requests, issue a new fetch - if (node != null && this.client.inFlightRequestCount(node.id()) == 0) { + if (node != null && this.client.inFlightRequestCount(node.idString()) == 0) { Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id()); if (fetch == null) { fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>(); @@ -274,7 +274,7 @@ public class Fetcher<K, V> { for (Map.Entry<Integer, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) { int nodeId = entry.getKey(); final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue()); - RequestSend send = new RequestSend(nodeId, this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct()); + RequestSend send = new RequestSend(Integer.toString(nodeId), this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct()); RequestCompletionHandler handler = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleFetchResponse(response, fetch); http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ded19d8..5a37580 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -227,7 +227,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags), + NetworkClient client = new NetworkClient( + new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 023bd2e..aa26420 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -169,11 +169,13 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface."; + /** <code>connections.max.idle.ms</code> */ + public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; + /** <code>partitioner.class</code> */ public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the <code>Partitioner</code> interface."; - static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -223,6 +225,8 @@ public class ProducerConfig extends AbstractConfig { MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC); } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1e943d6..07e65d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -320,7 +320,7 @@ public class Sender implements Runnable { recordsByPartition.put(tp, batch); } ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); - RequestSend send = new RequestSend(destination, + RequestSend send = new RequestSend(Integer.toString(destination), this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); RequestCompletionHandler callback = new RequestCompletionHandler() { @@ -505,10 +505,10 @@ public class Sender implements Runnable { topicErrorSensor.record(count, now); } - public void recordLatency(int node, long latency) { + public void recordLatency(String node, long latency) { long now = time.milliseconds(); this.requestTimeSensor.record(latency, now); - if (node >= 0) { + if (!node.isEmpty()) { String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); if (nodeRequestTime != null) http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/Node.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index f4e4186..644cd71 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -18,12 +18,14 @@ package org.apache.kafka.common; public class Node { private final int id; + private final String idString; private final String host; private final int port; public Node(int id, String host, int port) { super(); this.id = id; + this.idString = Integer.toString(id); this.host = host; this.port = port; } @@ -40,6 +42,14 @@ public class Node { } /** + * String representation of the node id. + * Typically the integer id is used to serialize over the wire, the string representation is used as an identifier with NetworkClient code + */ + public String idString() { + return idString; + } + + /** * The host name for this node */ public String host() { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java index 129ae82..159c301 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java @@ -25,11 +25,11 @@ import java.nio.channels.ScatteringByteChannel; */ public class ByteBufferReceive implements Receive { - private final int source; + private final String source; private final ByteBuffer[] buffers; private int remaining; - public ByteBufferReceive(int source, ByteBuffer... buffers) { + public ByteBufferReceive(String source, ByteBuffer... buffers) { super(); this.source = source; this.buffers = buffers; @@ -38,7 +38,7 @@ public class ByteBufferReceive implements Receive { } @Override - public int source() { + public String source() { return source; } @@ -54,8 +54,4 @@ public class ByteBufferReceive implements Receive { return read; } - public ByteBuffer[] reify() { - return buffers; - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index c8213e1..df0e6d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -22,12 +22,12 @@ import java.nio.channels.GatheringByteChannel; */ public class ByteBufferSend implements Send { - private final int destination; + private final String destination; protected final ByteBuffer[] buffers; private int remaining; private int size; - public ByteBufferSend(int destination, ByteBuffer... buffers) { + public ByteBufferSend(String destination, ByteBuffer... buffers) { super(); this.destination = destination; this.buffers = buffers; @@ -37,7 +37,7 @@ public class ByteBufferSend implements Send { } @Override - public int destination() { + public String destination() { return destination; } @@ -47,16 +47,7 @@ public class ByteBufferSend implements Send { } @Override - public ByteBuffer[] reify() { - return this.buffers; - } - - @Override - public int remaining() { - return this.remaining; - } - - public int size() { + public long size() { return this.size; } @@ -64,9 +55,8 @@ public class ByteBufferSend implements Send { public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) - throw new EOFException("This shouldn't happen."); + throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; return written; } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java b/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java new file mode 100644 index 0000000..a5bdd62 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/InvalidReceiveException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.KafkaException; + +public class InvalidReceiveException extends KafkaException { + + public InvalidReceiveException(String message) { + super(message); + } + + public InvalidReceiveException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java new file mode 100644 index 0000000..0e14a39 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.channels.GatheringByteChannel; +import java.util.Iterator; +import java.util.List; + +/** + * A set of composite sends, sent one after another + */ + +public class MultiSend implements Send { + + private static final Logger log = LoggerFactory.getLogger(MultiSend.class); + private String dest; + private long totalWritten = 0; + private List<Send> sends; + private Iterator<Send> sendsIterator; + private Send current; + private boolean doneSends = false; + private long size = 0; + + public MultiSend(String dest, List<Send> sends) { + this.dest = dest; + this.sends = sends; + this.sendsIterator = sends.iterator(); + nextSendOrDone(); + for (Send send: sends) + this.size += send.size(); + } + + @Override + public long size() { + return size; + } + + @Override + public String destination() { + return dest; + } + + @Override + public boolean completed() { + if (doneSends) { + if (totalWritten != size) + log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten); + return true; + } else { + return false; + } + } + + @Override + public long writeTo(GatheringByteChannel channel) throws IOException { + if (completed()) + throw new KafkaException("This operation cannot be completed on a complete request."); + + int totalWrittenPerCall = 0; + boolean sendComplete = false; + do { + long written = current.writeTo(channel); + totalWritten += written; + totalWrittenPerCall += written; + sendComplete = current.completed(); + if (sendComplete) + nextSendOrDone(); + } while (!completed() && sendComplete); + if (log.isTraceEnabled()) + log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size); + return totalWrittenPerCall; + } + + // update current if there's a next Send, mark sends as done if there isn't + private void nextSendOrDone() { + if (sendsIterator.hasNext()) + current = sendsIterator.next(); + else + doneSends = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index fc0d168..3ca0098 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.network; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.ScatteringByteChannel; /** @@ -22,24 +23,42 @@ import java.nio.channels.ScatteringByteChannel; */ public class NetworkReceive implements Receive { - private final int source; + public final static String UNKNOWN_SOURCE = ""; + public final static int UNLIMITED = -1; + + private final String source; private final ByteBuffer size; + private final int maxSize; private ByteBuffer buffer; - public NetworkReceive(int source, ByteBuffer buffer) { + + public NetworkReceive(String source, ByteBuffer buffer) { this.source = source; this.buffer = buffer; this.size = null; + this.maxSize = UNLIMITED; + } + + public NetworkReceive(String source) { + this.source = source; + this.size = ByteBuffer.allocate(4); + this.buffer = null; + this.maxSize = UNLIMITED; } - public NetworkReceive(int source) { + public NetworkReceive(int maxSize, String source) { this.source = source; this.size = ByteBuffer.allocate(4); this.buffer = null; + this.maxSize = maxSize; + } + + public NetworkReceive() { + this(UNKNOWN_SOURCE); } @Override - public int source() { + public String source() { return source; } @@ -48,13 +67,15 @@ public class NetworkReceive implements Receive { return !size.hasRemaining() && !buffer.hasRemaining(); } - @Override - public ByteBuffer[] reify() { - return new ByteBuffer[] {this.buffer}; + public long readFrom(ScatteringByteChannel channel) throws IOException { + return readFromReadableChannel(channel); } - @Override - public long readFrom(ScatteringByteChannel channel) throws IOException { + // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout + // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work + // This can go away after we get rid of BlockingChannel + @Deprecated + public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; if (size.hasRemaining()) { int bytesRead = channel.read(size); @@ -63,10 +84,12 @@ public class NetworkReceive implements Receive { read += bytesRead; if (!size.hasRemaining()) { size.rewind(); - int requestSize = size.getInt(); - if (requestSize < 0) - throw new IllegalStateException("Invalid request (size = " + requestSize + ")"); - this.buffer = ByteBuffer.allocate(requestSize); + int receiveSize = size.getInt(); + if (receiveSize < 0) + throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); + if (maxSize != UNLIMITED && receiveSize > maxSize) + throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); + this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) { @@ -83,4 +106,14 @@ public class NetworkReceive implements Receive { return this.buffer; } + // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel + @Deprecated + public long readCompletely(ReadableByteChannel channel) throws IOException { + int totalRead = 0; + while (!complete()) { + totalRead += readFromReadableChannel(channel); + } + return totalRead; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java index 68327cd..49964b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java @@ -23,7 +23,7 @@ import java.nio.ByteBuffer; */ public class NetworkSend extends ByteBufferSend { - public NetworkSend(int destination, ByteBuffer... buffers) { + public NetworkSend(String destination, ByteBuffer... buffers) { super(destination, sizeDelimit(buffers)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Receive.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Receive.java b/clients/src/main/java/org/apache/kafka/common/network/Receive.java index 4e33078..4b14431 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Receive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Receive.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.network; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.ScatteringByteChannel; /** @@ -28,7 +27,7 @@ public interface Receive { /** * The numeric id of the source from which we are receiving data. */ - public int source(); + public String source(); /** * Are we done receiving data? @@ -36,11 +35,6 @@ public interface Receive { public boolean complete(); /** - * Turn this receive into ByteBuffer instances, if possible (otherwise returns null). - */ - public ByteBuffer[] reify(); - - /** * Read bytes into this receive from the given channel * @param channel The channel to read from * @return The number of bytes read http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Selectable.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index b5f8d83..618a0fa 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -29,12 +29,12 @@ public interface Selectable { * @param receiveBufferSize The receive buffer for the socket * @throws IOException If we cannot begin connecting */ - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException; + public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException; /** * Begin disconnecting the connection identified by the given id */ - public void disconnect(int id); + public void disconnect(String id); /** * Wakeup this selector if it is blocked on I/O @@ -50,7 +50,7 @@ public interface Selectable { * Queue the given request for sending in the subsequent {@poll(long)} calls * @param send The request to send */ - public void send(NetworkSend send); + public void send(Send send); /** * Do I/O. Reads, writes, connection establishment, etc. @@ -62,7 +62,7 @@ public interface Selectable { /** * The list of sends that completed on the last {@link #poll(long, List) poll()} call. */ - public List<NetworkSend> completedSends(); + public List<Send> completedSends(); /** * The list of receives that completed on the last {@link #poll(long, List) poll()} call. @@ -73,25 +73,25 @@ public interface Selectable { * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()} * call. */ - public List<Integer> disconnected(); + public List<String> disconnected(); /** * The list of connections that completed their connection on the last {@link #poll(long, List) poll()} * call. */ - public List<Integer> connected(); + public List<String> connected(); /** * Disable reads from the given connection * @param id The id for the connection */ - public void mute(int id); + public void mute(String id); /** * Re-enable reads from the given connection * @param id The id for the connection */ - public void unmute(int id); + public void unmute(String id); /** * Disable reads from all connections