http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 57de058..effb1e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -17,17 +17,8 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.channels.UnresolvedAddressException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.nio.channels.*; +import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.kafka.common.KafkaException; @@ -40,20 +31,21 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A selector interface for doing non-blocking multi-connection network I/O. + * A nioSelector interface for doing non-blocking multi-connection network I/O. * <p> * This class works with {@link NetworkSend} and {@link NetworkReceive} to transmit size-delimited network requests and * responses. * <p> - * A connection can be added to the selector associated with an integer id by doing + * A connection can be added to the nioSelector associated with an integer id by doing * * <pre> - * selector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000); + * nioSelector.connect(42, new InetSocketAddress("google.com", server.port), 64000, 64000); * </pre> * * The connect call does not block on the creation of the TCP connection, so the connect method only begins initiating @@ -64,10 +56,10 @@ import org.slf4j.LoggerFactory; * * <pre> * List<NetworkRequest> requestsToSend = Arrays.asList(new NetworkRequest(0, myBytes), new NetworkRequest(1, myOtherBytes)); - * selector.poll(TIMEOUT_MS, requestsToSend); + * nioSelector.poll(TIMEOUT_MS, requestsToSend); * </pre> * - * The selector maintains several lists that are reset by each call to <code>poll()</code> which are available via + * The nioSelector maintains several lists that are reset by each call to <code>poll()</code> which are available via * various getters. These are reset by each call to <code>poll()</code>. * * This class is not thread safe! @@ -76,41 +68,59 @@ public class Selector implements Selectable { private static final Logger log = LoggerFactory.getLogger(Selector.class); - private final java.nio.channels.Selector selector; - private final Map<Integer, SelectionKey> keys; - private final List<NetworkSend> completedSends; + private final java.nio.channels.Selector nioSelector; + private final Map<String, SelectionKey> keys; + private final List<Send> completedSends; private final List<NetworkReceive> completedReceives; - private final List<Integer> disconnected; - private final List<Integer> connected; - private final List<Integer> failedSends; + private final List<String> disconnected; + private final List<String> connected; + private final List<String> failedSends; private final Time time; private final SelectorMetrics sensors; private final String metricGrpPrefix; private final Map<String, String> metricTags; + private final Map<String, Long> lruConnections; + private final long connectionsMaxIdleNanos; + private final int maxReceiveSize; + private final boolean metricsPerConnection; + private long currentTimeNanos; + private long nextIdleCloseCheckTime; + /** - * Create a new selector + * Create a new nioSelector */ - public Selector(Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) { + public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) { try { - this.selector = java.nio.channels.Selector.open(); + this.nioSelector = java.nio.channels.Selector.open(); } catch (IOException e) { throw new KafkaException(e); } + this.maxReceiveSize = maxReceiveSize; + this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000; this.time = time; this.metricGrpPrefix = metricGrpPrefix; this.metricTags = metricTags; - this.keys = new HashMap<Integer, SelectionKey>(); - this.completedSends = new ArrayList<NetworkSend>(); + this.keys = new HashMap<String, SelectionKey>(); + this.completedSends = new ArrayList<Send>(); this.completedReceives = new ArrayList<NetworkReceive>(); - this.connected = new ArrayList<Integer>(); - this.disconnected = new ArrayList<Integer>(); - this.failedSends = new ArrayList<Integer>(); + this.connected = new ArrayList<String>(); + this.disconnected = new ArrayList<String>(); + this.failedSends = new ArrayList<String>(); this.sensors = new SelectorMetrics(metrics); + // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true + this.lruConnections = new LinkedHashMap<String, Long>(16, .75F, true); + currentTimeNanos = new SystemTime().nanoseconds(); + nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; + this.metricsPerConnection = metricsPerConnection; + } + + public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) { + this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true); } /** - * Begin connecting to the given address and add the connection to this selector associated with the given id + * Begin connecting to the given address and add the connection to this nioSelector associated with the given id * number. * <p> * Note that this call only initiates the connection, which will be completed on a future {@link #poll(long, List)} @@ -123,7 +133,7 @@ public class Selector implements Selectable { * @throws IOException if DNS resolution fails on the hostname or if the broker is down */ @Override - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.keys.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); @@ -143,7 +153,18 @@ public class Selector implements Selectable { channel.close(); throw e; } - SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT); + SelectionKey key = channel.register(this.nioSelector, SelectionKey.OP_CONNECT); + key.attach(new Transmissions(id)); + this.keys.put(id, key); + } + + /** + * Register the nioSelector with an existing channel + * Use this on server-side, when a connection is accepted by a different thread but processed by the Selector + * Note that we are not checking if the connection id is valid - since the connection already exists + */ + public void register(String id, SocketChannel channel) throws ClosedChannelException { + SelectionKey key = channel.register(nioSelector, SelectionKey.OP_READ); key.attach(new Transmissions(id)); this.keys.put(id, key); } @@ -153,18 +174,18 @@ public class Selector implements Selectable { * processed until the next {@link #poll(long, List) poll()} call. */ @Override - public void disconnect(int id) { + public void disconnect(String id) { SelectionKey key = this.keys.get(id); if (key != null) key.cancel(); } /** - * Interrupt the selector if it is blocked waiting to do I/O. + * Interrupt the nioSelector if it is blocked waiting to do I/O. */ @Override public void wakeup() { - this.selector.wakeup(); + this.nioSelector.wakeup(); } /** @@ -172,12 +193,14 @@ public class Selector implements Selectable { */ @Override public void close() { - for (SelectionKey key : this.selector.keys()) - close(key); + List<String> connections = new LinkedList<String>(keys.keySet()); + for (String id: connections) + close(id); + try { - this.selector.close(); + this.nioSelector.close(); } catch (IOException e) { - log.error("Exception closing selector:", e); + log.error("Exception closing nioSelector:", e); } } @@ -185,7 +208,7 @@ public class Selector implements Selectable { * Queue the given request for sending in the subsequent {@poll(long)} calls * @param send The request to send */ - public void send(NetworkSend send) { + public void send(Send send) { SelectionKey key = keyForId(send.destination()); Transmissions transmissions = transmissions(key); if (transmissions.hasSend()) @@ -194,7 +217,7 @@ public class Selector implements Selectable { try { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } catch (CancelledKeyException e) { - close(key); + close(transmissions.id); this.failedSends.add(send.destination()); } } @@ -220,10 +243,11 @@ public class Selector implements Selectable { long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); + currentTimeNanos = endSelect; this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0) { - Set<SelectionKey> keys = this.selector.selectedKeys(); + Set<SelectionKey> keys = this.nioSelector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); @@ -232,8 +256,9 @@ public class Selector implements Selectable { Transmissions transmissions = transmissions(key); SocketChannel channel = channel(key); - // register all per-broker metrics at once - sensors.maybeRegisterNodeMetrics(transmissions.id); + // register all per-connection metrics at once + sensors.maybeRegisterConnectionMetrics(transmissions.id); + lruConnections.put(transmissions.id, currentTimeNanos); try { /* complete any connections that have finished their handshake */ @@ -247,8 +272,14 @@ public class Selector implements Selectable { /* read from any connections that have readable data */ if (key.isReadable()) { if (!transmissions.hasReceive()) - transmissions.receive = new NetworkReceive(transmissions.id); - transmissions.receive.readFrom(channel); + transmissions.receive = new NetworkReceive(maxReceiveSize, transmissions.id); + try { + transmissions.receive.readFrom(channel); + } catch (InvalidReceiveException e) { + log.error("Invalid data received from " + transmissions.id + " closing connection", e); + close(transmissions.id); + throw e; + } if (transmissions.receive.complete()) { transmissions.receive.payload().rewind(); this.completedReceives.add(transmissions.receive); @@ -260,7 +291,7 @@ public class Selector implements Selectable { /* write to any sockets that have space in their buffer and for which we have data */ if (key.isWritable()) { transmissions.send.writeTo(channel); - if (transmissions.send.remaining() <= 0) { + if (transmissions.send.completed()) { this.completedSends.add(transmissions.send); this.sensors.recordBytesSent(transmissions.id, transmissions.send.size()); transmissions.clearSend(); @@ -270,7 +301,7 @@ public class Selector implements Selectable { /* cancel any defunct sockets */ if (!key.isValid()) { - close(key); + close(transmissions.id); this.disconnected.add(transmissions.id); } } catch (IOException e) { @@ -279,15 +310,16 @@ public class Selector implements Selectable { log.info("Connection {} disconnected", desc); else log.warn("Error in I/O with connection to {}", desc, e); - close(key); + close(transmissions.id); this.disconnected.add(transmissions.id); } } } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); + maybeCloseOldestConnection(); } - + private String socketDescription(SocketChannel channel) { Socket socket = channel.socket(); if (socket == null) @@ -299,7 +331,7 @@ public class Selector implements Selectable { } @Override - public List<NetworkSend> completedSends() { + public List<Send> completedSends() { return this.completedSends; } @@ -309,17 +341,17 @@ public class Selector implements Selectable { } @Override - public List<Integer> disconnected() { + public List<String> disconnected() { return this.disconnected; } @Override - public List<Integer> connected() { + public List<String> connected() { return this.connected; } @Override - public void mute(int id) { + public void mute(String id) { mute(this.keyForId(id)); } @@ -328,7 +360,7 @@ public class Selector implements Selectable { } @Override - public void unmute(int id) { + public void unmute(String id) { unmute(this.keyForId(id)); } @@ -348,6 +380,25 @@ public class Selector implements Selectable { unmute(key); } + private void maybeCloseOldestConnection() { + if (currentTimeNanos > nextIdleCloseCheckTime) { + if (lruConnections.isEmpty()) { + nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; + } else { + Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); + Long connectionLastActiveTime = oldestConnectionEntry.getValue(); + nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; + if (currentTimeNanos > nextIdleCloseCheckTime) { + String connectionId = oldestConnectionEntry.getKey(); + if (log.isTraceEnabled()) + log.trace("About to close the idle connection from " + connectionId + + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); + close(connectionId); + } + } + } + } + /** * Clear the results from the prior poll */ @@ -369,17 +420,19 @@ public class Selector implements Selectable { */ private int select(long ms) throws IOException { if (ms == 0L) - return this.selector.selectNow(); + return this.nioSelector.selectNow(); else if (ms < 0L) - return this.selector.select(); + return this.nioSelector.select(); else - return this.selector.select(ms); + return this.nioSelector.select(ms); } /** * Begin closing this connection */ - private void close(SelectionKey key) { + public void close(String id) { + SelectionKey key = keyForId(id); + lruConnections.remove(id); SocketChannel channel = channel(key); Transmissions trans = transmissions(key); if (trans != null) { @@ -401,10 +454,10 @@ public class Selector implements Selectable { /** * Get the selection key associated with this numeric id */ - private SelectionKey keyForId(int id) { + private SelectionKey keyForId(String id) { SelectionKey key = this.keys.get(id); if (key == null) - throw new IllegalStateException("Attempt to write to socket for which there is no open connection."); + throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + keys.keySet().toString()); return key; } @@ -426,11 +479,11 @@ public class Selector implements Selectable { * The id and in-progress send and receive associated with a connection */ private static class Transmissions { - public int id; - public NetworkSend send; + public String id; + public Send send; public NetworkReceive receive; - public Transmissions(int id) { + public Transmissions(String id) { this.id = id; } @@ -464,20 +517,27 @@ public class Selector implements Selectable { public SelectorMetrics(Metrics metrics) { this.metrics = metrics; String metricGrpName = metricGrpPrefix + "-metrics"; + StringBuilder tagsSuffix = new StringBuilder(); + + for (Map.Entry<String, String> tag: metricTags.entrySet()) { + tagsSuffix.append(tag.getKey()); + tagsSuffix.append("-"); + tagsSuffix.append(tag.getValue()); + } - this.connectionClosed = this.metrics.sensor("connections-closed"); + this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString()); MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags); this.connectionClosed.add(metricName, new Rate()); - this.connectionCreated = this.metrics.sensor("connections-created"); + this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString()); metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags); this.connectionCreated.add(metricName, new Rate()); - this.bytesTransferred = this.metrics.sensor("bytes-sent-received"); + this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString()); metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags); bytesTransferred.add(metricName, new Rate(new Count())); - this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred); + this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred); metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags); this.bytesSent.add(metricName, new Rate()); metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags); @@ -487,13 +547,13 @@ public class Selector implements Selectable { metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags); this.bytesSent.add(metricName, new Max()); - this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred); + this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred); metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags); this.bytesReceived.add(metricName, new Rate()); metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags); this.bytesReceived.add(metricName, new Rate(new Count())); - this.selectTime = this.metrics.sensor("select-time"); + this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString()); metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags); this.selectTime.add(metricName, new Rate(new Count())); metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags); @@ -501,7 +561,7 @@ public class Selector implements Selectable { metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags); this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); - this.ioTime = this.metrics.sensor("io-time"); + this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString()); metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags); this.ioTime.add(metricName, new Avg()); metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags); @@ -515,17 +575,17 @@ public class Selector implements Selectable { }); } - public void maybeRegisterNodeMetrics(int node) { - if (node >= 0) { - // if one sensor of the metrics has been registered for the node, + public void maybeRegisterConnectionMetrics(String connectionId) { + if (!connectionId.isEmpty() && metricsPerConnection) { + // if one sensor of the metrics has been registered for the connection, // then all other sensors should have been registered; and vice versa - String nodeRequestName = "node-" + node + ".bytes-sent"; + String nodeRequestName = "node-" + connectionId + ".bytes-sent"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); if (nodeRequest == null) { String metricGrpName = metricGrpPrefix + "-node-metrics"; Map<String, String> tags = new LinkedHashMap<String, String>(metricTags); - tags.put("node-id", "node-" + node); + tags.put("node-id", "node-" + connectionId); nodeRequest = this.metrics.sensor(nodeRequestName); MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags); @@ -537,14 +597,14 @@ public class Selector implements Selectable { metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags); nodeRequest.add(metricName, new Max()); - String nodeResponseName = "node-" + node + ".bytes-received"; + String nodeResponseName = "node-" + connectionId + ".bytes-received"; Sensor nodeResponse = this.metrics.sensor(nodeResponseName); metricName = new MetricName("incoming-byte-rate", metricGrpName, tags); nodeResponse.add(metricName, new Rate()); metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags); nodeResponse.add(metricName, new Rate(new Count())); - String nodeTimeName = "node-" + node + ".latency"; + String nodeTimeName = "node-" + connectionId + ".latency"; Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName); metricName = new MetricName("request-latency-avg", metricGrpName, tags); nodeRequestTime.add(metricName, new Avg()); @@ -554,22 +614,22 @@ public class Selector implements Selectable { } } - public void recordBytesSent(int node, int bytes) { + public void recordBytesSent(String connectionId, long bytes) { long now = time.milliseconds(); this.bytesSent.record(bytes, now); - if (node >= 0) { - String nodeRequestName = "node-" + node + ".bytes-sent"; + if (!connectionId.isEmpty()) { + String nodeRequestName = "node-" + connectionId + ".bytes-sent"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); if (nodeRequest != null) nodeRequest.record(bytes, now); } } - public void recordBytesReceived(int node, int bytes) { + public void recordBytesReceived(String connection, int bytes) { long now = time.milliseconds(); this.bytesReceived.record(bytes, now); - if (node >= 0) { - String nodeRequestName = "node-" + node + ".bytes-received"; + if (!connection.isEmpty()) { + String nodeRequestName = "node-" + connection + ".bytes-received"; Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); if (nodeRequest != null) nodeRequest.record(bytes, now);
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/network/Send.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java index 5d321a0..8f6daad 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Send.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java @@ -13,7 +13,6 @@ package org.apache.kafka.common.network; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; /** @@ -24,12 +23,7 @@ public interface Send { /** * The numeric id for the destination of this send */ - public int destination(); - - /** - * The number of bytes remaining to send - */ - public int remaining(); + public String destination(); /** * Is this send complete? @@ -37,11 +31,6 @@ public interface Send { public boolean completed(); /** - * An optional method to turn this send into an array of ByteBuffers if possible (otherwise returns null) - */ - public ByteBuffer[] reify(); - - /** * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send * to be completely written * @param channel The channel to write to @@ -50,4 +39,9 @@ public interface Send { */ public long writeTo(GatheringByteChannel channel) throws IOException; + /** + * Size of the send + */ + public long size(); + } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java index 27cbf39..3fec60b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java @@ -25,7 +25,7 @@ public class RequestSend extends NetworkSend { private final RequestHeader header; private final Struct body; - public RequestSend(int destination, RequestHeader header, Struct body) { + public RequestSend(String destination, RequestHeader header, Struct body) { super(destination, serialize(header, body)); this.header = header; this.body = body; http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java new file mode 100644 index 0000000..12b06d1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class ResponseSend extends NetworkSend { + + public ResponseSend(String destination, ResponseHeader header, Struct body) { + super(destination, serialize(header, body)); + } + + public ResponseSend(String destination, ResponseHeader header, AbstractRequestResponse response) { + this(destination, header, response.toStruct()); + } + + private static ByteBuffer serialize(ResponseHeader header, Struct body) { + ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf()); + header.writeTo(buffer); + body.writeTo(buffer); + buffer.rewind(); + return buffer; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 5e3fab1..d9c97e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -78,7 +78,7 @@ public class MockClient implements KafkaClient { return false; } - public void disconnect(Integer node) { + public void disconnect(String node) { Iterator<ClientRequest> iter = requests.iterator(); while (iter.hasNext()) { ClientRequest request = iter.next(); @@ -115,7 +115,7 @@ public class MockClient implements KafkaClient { } @Override - public List<ClientResponse> completeAll(int node, long now) { + public List<ClientResponse> completeAll(String node, long now) { return completeAll(now); } @@ -158,7 +158,7 @@ public class MockClient implements KafkaClient { } @Override - public int inFlightRequestCount(int nodeId) { + public int inFlightRequestCount(String nodeId) { return requests.size(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 8b27889..43238ce 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -65,7 +65,7 @@ public class NetworkClientTest { client.poll(1, time.milliseconds()); selector.clear(); assertTrue("Now the client is ready", client.ready(node, time.milliseconds())); - selector.disconnect(node.id()); + selector.disconnect(node.idString()); client.poll(1, time.milliseconds()); selector.clear(); assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); @@ -74,7 +74,7 @@ public class NetworkClientTest { @Test(expected = IllegalStateException.class) public void testSendToUnreadyNode() { - RequestSend send = new RequestSend(5, + RequestSend send = new RequestSend("5", client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct()); ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null); @@ -86,7 +86,7 @@ public class NetworkClientTest { public void testSimpleRequestResponse() { ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap()); RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE); - RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct()); + RequestSend send = new RequestSend(node.idString(), reqHeader, produceRequest.toStruct()); TestCallbackHandler handler = new TestCallbackHandler(); ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler); awaitReady(client, node); @@ -101,7 +101,7 @@ public class NetworkClientTest { respHeader.writeTo(buffer); resp.writeTo(buffer); buffer.flip(); - selector.completeReceive(new NetworkReceive(node.id(), buffer)); + selector.completeReceive(new NetworkReceive(node.idString(), buffer)); List<ClientResponse> responses = client.poll(1, time.milliseconds()); assertEquals(1, responses.size()); assertTrue("The handler should have executed.", handler.executed); http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index d5b306b..d23b4b6 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -22,10 +22,7 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.*; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; @@ -49,7 +46,7 @@ public class SelectorTest { public void setup() throws Exception { this.server = new EchoServer(); this.server.start(); - this.selector = new Selector(new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>()); + this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>()); } @After @@ -63,7 +60,7 @@ public class SelectorTest { */ @Test public void testServerDisconnect() throws Exception { - int node = 0; + String node = "0"; // connect and do a simple request blockingConnect(node); @@ -84,7 +81,7 @@ public class SelectorTest { */ @Test public void testClientDisconnect() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); selector.disconnect(node); selector.send(createSend(node, "hello1")); @@ -101,7 +98,7 @@ public class SelectorTest { */ @Test(expected = IllegalStateException.class) public void testCantSendWithInProgress() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); selector.send(createSend(node, "test1")); selector.send(createSend(node, "test2")); @@ -113,7 +110,7 @@ public class SelectorTest { */ @Test(expected = IllegalStateException.class) public void testCantSendWithoutConnecting() throws Exception { - selector.send(createSend(0, "test")); + selector.send(createSend("0", "test")); selector.poll(1000L); } @@ -122,7 +119,7 @@ public class SelectorTest { */ @Test(expected = IOException.class) public void testNoRouteToHost() throws Exception { - selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE); + selector.connect("0", new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE); } /** @@ -130,7 +127,7 @@ public class SelectorTest { */ @Test public void testConnectionRefused() throws Exception { - int node = 0; + String node = "0"; ServerSocket nonListeningSocket = new ServerSocket(0); int nonListeningPort = nonListeningSocket.getLocalPort(); selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE); @@ -151,14 +148,15 @@ public class SelectorTest { // create connections InetSocketAddress addr = new InetSocketAddress("localhost", server.port); for (int i = 0; i < conns; i++) - selector.connect(i, addr, BUFFER_SIZE, BUFFER_SIZE); - + selector.connect(Integer.toString(i), addr, BUFFER_SIZE, BUFFER_SIZE); // send echo requests and receive responses - int[] requests = new int[conns]; - int[] responses = new int[conns]; + Map<String, Integer> requests = new HashMap<String, Integer>(); + Map<String, Integer> responses = new HashMap<String, Integer>(); int responseCount = 0; - for (int i = 0; i < conns; i++) - selector.send(createSend(i, i + "-" + 0)); + for (int i = 0; i < conns; i++) { + String node = Integer.toString(i); + selector.send(createSend(node, node + "-0")); + } // loop until we complete all requests while (responseCount < conns * reqs) { @@ -171,19 +169,27 @@ public class SelectorTest { for (NetworkReceive receive : selector.completedReceives()) { String[] pieces = asString(receive).split("-"); assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); - assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0])); + assertEquals("Check the source", receive.source(), pieces[0]); assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position()); - assertEquals("Check the request counter", responses[receive.source()], Integer.parseInt(pieces[1])); - responses[receive.source()]++; // increment the expected counter + if (responses.containsKey(receive.source())) { + assertEquals("Check the request counter", (int) responses.get(receive.source()), Integer.parseInt(pieces[1])); + responses.put(receive.source(), responses.get(receive.source()) + 1); + } else { + assertEquals("Check the request counter", 0, Integer.parseInt(pieces[1])); + responses.put(receive.source(), 1); + } responseCount++; } // prepare new sends for the next round - for (NetworkSend send : selector.completedSends()) { - int dest = send.destination(); - requests[dest]++; - if (requests[dest] < reqs) - selector.send(createSend(dest, dest + "-" + requests[dest])); + for (Send send : selector.completedSends()) { + String dest = send.destination(); + if (requests.containsKey(dest)) + requests.put(dest, requests.get(dest) + 1); + else + requests.put(dest, 1); + if (requests.get(dest) < reqs) + selector.send(createSend(dest, dest + "-" + requests.get(dest))); } } } @@ -193,7 +199,7 @@ public class SelectorTest { */ @Test public void testSendLargeRequest() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); String big = TestUtils.randomString(10 * BUFFER_SIZE); assertEquals(big, blockingRequest(node, big)); @@ -204,41 +210,41 @@ public class SelectorTest { */ @Test public void testEmptyRequest() throws Exception { - int node = 0; + String node = "0"; blockingConnect(node); assertEquals("", blockingRequest(node, "")); } @Test(expected = IllegalStateException.class) public void testExistingConnectionId() throws IOException { - blockingConnect(0); - blockingConnect(0); + blockingConnect("0"); + blockingConnect("0"); } @Test public void testMute() throws Exception { - blockingConnect(0); - blockingConnect(1); + blockingConnect("0"); + blockingConnect("1"); - selector.send(createSend(0, "hello")); - selector.send(createSend(1, "hi")); + selector.send(createSend("0", "hello")); + selector.send(createSend("1", "hi")); - selector.mute(1); + selector.mute("1"); while (selector.completedReceives().isEmpty()) selector.poll(5); assertEquals("We should have only one response", 1, selector.completedReceives().size()); - assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source()); + assertEquals("The response should not be from the muted node", "0", selector.completedReceives().get(0).source()); - selector.unmute(1); + selector.unmute("1"); do { selector.poll(5); } while (selector.completedReceives().isEmpty()); assertEquals("We should have only one response", 1, selector.completedReceives().size()); - assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source()); + assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source()); } - private String blockingRequest(int node, String s) throws IOException { + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L); while (true) { @@ -250,13 +256,13 @@ public class SelectorTest { } /* connect and wait for the connection to complete */ - private void blockingConnect(int node) throws IOException { + private void blockingConnect(String node) throws IOException { selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); while (!selector.connected().contains(node)) selector.poll(10000L); } - private NetworkSend createSend(int node, String s) { + private NetworkSend createSend(String node, String s) { return new NetworkSend(node, ByteBuffer.wrap(s.getBytes())); } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/clients/src/test/java/org/apache/kafka/test/MockSelector.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index ea89b06..51eb9d1 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.network.Send; import org.apache.kafka.common.utils.Time; /** @@ -28,23 +29,23 @@ import org.apache.kafka.common.utils.Time; public class MockSelector implements Selectable { private final Time time; - private final List<NetworkSend> initiatedSends = new ArrayList<NetworkSend>(); - private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>(); + private final List<Send> initiatedSends = new ArrayList<Send>(); + private final List<Send> completedSends = new ArrayList<Send>(); private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>(); - private final List<Integer> disconnected = new ArrayList<Integer>(); - private final List<Integer> connected = new ArrayList<Integer>(); + private final List<String> disconnected = new ArrayList<String>(); + private final List<String> connected = new ArrayList<String>(); public MockSelector(Time time) { this.time = time; } @Override - public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { this.connected.add(id); } @Override - public void disconnect(int id) { + public void disconnect(String id) { this.disconnected.add(id); } @@ -64,7 +65,7 @@ public class MockSelector implements Selectable { } @Override - public void send(NetworkSend send) { + public void send(Send send) { this.initiatedSends.add(send); } @@ -76,7 +77,7 @@ public class MockSelector implements Selectable { } @Override - public List<NetworkSend> completedSends() { + public List<Send> completedSends() { return completedSends; } @@ -94,21 +95,21 @@ public class MockSelector implements Selectable { } @Override - public List<Integer> disconnected() { + public List<String> disconnected() { return disconnected; } @Override - public List<Integer> connected() { + public List<String> connected() { return connected; } @Override - public void mute(int id) { + public void mute(String id) { } @Override - public void unmute(int id) { + public void unmute(String id) { } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/Kafka.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 9efabaa..6af7b80 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -17,6 +17,8 @@ package kafka +import java.util.Properties + import scala.collection.JavaConversions._ import joptsimple.OptionParser import metrics.KafkaMetricsReporter @@ -26,7 +28,7 @@ import org.apache.kafka.common.utils.Utils object Kafka extends Logging { - def getKafkaConfigFromArgs(args: Array[String]): KafkaConfig = { + def getPropsFromArgs(args: Array[String]): Properties = { val optionParser = new OptionParser val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file") .withRequiredArg() @@ -47,14 +49,14 @@ object Kafka extends Logging { props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt))) } - - KafkaConfig.fromProps(props) + props } def main(args: Array[String]): Unit = { try { - val serverConfig = getKafkaConfigFromArgs(args) - KafkaMetricsReporter.startReporters(new VerifiableProperties(serverConfig.toProps)) + val serverProps = getPropsFromArgs(args) + val serverConfig = KafkaConfig.fromProps(serverProps) + KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) val kafkaServerStartable = new KafkaServerStartable(serverConfig) // attach shutdown handler to catch control-c http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 6d1c6ab..f23120e 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -174,7 +174,7 @@ object ConsumerGroupCommand { val offsetMap = mutable.Map[TopicAndPartition, Long]() val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala index a3b1b78..258d5fe 100644 --- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala @@ -18,9 +18,10 @@ package kafka.api import java.nio.ByteBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.network.RequestChannel.Response + import kafka.common.ErrorMapping +import kafka.network.{RequestOrResponseSend, RequestChannel} +import kafka.network.RequestChannel.Response object ConsumerMetadataRequest { val CurrentVersion = 0.shortValue @@ -64,7 +65,7 @@ case class ConsumerMetadataRequest(group: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { // return ConsumerCoordinatorNotAvailable for all uncaught errors val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } def describe(details: Boolean) = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index fe81635..8092007 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -18,10 +18,9 @@ package kafka.api import java.nio.ByteBuffer -import kafka.api.ApiUtils._ -import collection.mutable.ListBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.{TopicAndPartition, ErrorMapping} + +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging @@ -63,7 +62,7 @@ case class ControlledShutdownRequest(versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition]) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean = false): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/FetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index b038c15..5b38f85 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -149,7 +149,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty)) } val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse))) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/FetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 75aaf57..0b6b33a 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -22,8 +22,10 @@ import java.nio.channels.GatheringByteChannel import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} -import kafka.network.{MultiSend, Send} import kafka.api.ApiUtils._ +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.network.Send +import org.apache.kafka.common.network.MultiSend import scala.collection._ @@ -62,10 +64,12 @@ class PartitionDataSend(val partitionId: Int, buffer.putInt(partitionData.messages.sizeInBytes) buffer.rewind() - override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize - override def writeTo(channel: GatheringByteChannel): Int = { - var written = 0 + override def destination: String = "" + + override def writeTo(channel: GatheringByteChannel): Long = { + var written = 0L if(buffer.hasRemaining) written += channel.write(buffer) if(!buffer.hasRemaining && messagesSentSize < messageSize) { @@ -75,6 +79,8 @@ class PartitionDataSend(val partitionId: Int, } written } + + override def size = buffer.capacity() + messageSize } object TopicData { @@ -101,29 +107,32 @@ case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartiti val headerSize = TopicData.headerSize(topic) } -class TopicDataSend(val topicData: TopicData) extends Send { - private val size = topicData.sizeInBytes +class TopicDataSend(val dest: String, val topicData: TopicData) extends Send { + + private var sent = 0L - private var sent = 0 + override def completed: Boolean = sent >= size - override def complete = sent >= size + override def destination: String = dest + + override def size = topicData.headerSize + sends.size() private val buffer = ByteBuffer.allocate(topicData.headerSize) writeShortString(buffer, topicData.topic) buffer.putInt(topicData.partitionData.size) buffer.rewind() - val sends = new MultiSend(topicData.partitionData.toList - .map(d => new PartitionDataSend(d._1, d._2))) { - val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize - } + private val sends = new MultiSend(dest, + JavaConversions.seqAsJavaList(topicData.partitionData.toList.map(d => new PartitionDataSend(d._1, d._2)))) - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 + override def writeTo(channel: GatheringByteChannel): Long = { + if (completed) + throw new KafkaException("This operation cannot be completed on a complete request.") + + var written = 0L if(buffer.hasRemaining) written += channel.write(buffer) - if(!buffer.hasRemaining && !sends.complete) { + if(!buffer.hasRemaining && !sends.completed) { written += sends.writeTo(channel) } sent += written @@ -200,34 +209,36 @@ case class FetchResponse(correlationId: Int, data: Map[TopicAndPartition, FetchR } -class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { - private val size = fetchResponse.sizeInBytes +class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) extends Send { + private val payloadSize = fetchResponse.sizeInBytes + + private var sent = 0L - private var sent = 0 + override def size = 4 /* for size byte */ + payloadSize - private val sendSize = 4 /* for size */ + size + override def completed = sent >= size - override def complete = sent >= sendSize + override def destination = dest private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) - buffer.putInt(size) + buffer.putInt(payloadSize) buffer.putInt(fetchResponse.correlationId) buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count buffer.rewind() - val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map { - case(topic, data) => new TopicDataSend(TopicData(topic, + private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.dataGroupedByTopic.toList.map { + case(topic, data) => new TopicDataSend(dest, TopicData(topic, data.map{case(topicAndPartition, message) => (topicAndPartition.partition, message)})) - }) { - val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize - } + })) + + override def writeTo(channel: GatheringByteChannel): Long = { + if (completed) + throw new KafkaException("This operation cannot be completed on a complete request.") - def writeTo(channel: GatheringByteChannel):Int = { - expectIncomplete() - var written = 0 + var written = 0L if(buffer.hasRemaining) written += channel.write(buffer) - if(!buffer.hasRemaining && !sends.complete) { + if(!buffer.hasRemaining && !sends.completed) { written += sends.writeTo(channel) } sent += written http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 431190a..c2584e0 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -19,14 +19,16 @@ package kafka.api import java.nio._ -import kafka.utils._ + import kafka.api.ApiUtils._ import kafka.cluster.BrokerEndPoint -import kafka.controller.LeaderIsrAndControllerEpoch -import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import collection.Set +import kafka.utils._ + +import scala.collection.Set object LeaderAndIsr { @@ -184,7 +186,7 @@ case class LeaderAndIsrRequest (versionId: Short, case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } val errorResponse = LeaderAndIsrResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 317daed..5b362ef 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -18,11 +18,13 @@ package kafka.api import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import kafka.utils.{SystemTime, Logging} -import kafka.network.{RequestChannel, BoundedByteBufferSend} -import kafka.common.{OffsetMetadata, OffsetAndMetadata, ErrorMapping, TopicAndPartition} +import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response +import kafka.utils.Logging + import scala.collection._ object OffsetCommitRequest extends Logging { @@ -162,7 +164,7 @@ case class OffsetCommitRequest(groupId: String, val commitStatus = requestInfo.mapValues(_ => errorCode) val commitResponse = OffsetCommitResponse(commitStatus, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(commitResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetFetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index fa8bd6a..a83e147 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -17,16 +17,13 @@ package kafka.api +import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import kafka.utils.Logging -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common._ -import kafka.common.TopicAndPartition +import kafka.common.{TopicAndPartition, _} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response - -import scala.Some - -import java.nio.ByteBuffer +import kafka.utils.Logging object OffsetFetchRequest extends Logging { val CurrentVersion: Short = 1 @@ -99,7 +96,7 @@ case class OffsetFetchRequest(groupId: String, )) }.toMap val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/OffsetRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 3d483bc..f418868 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -18,9 +18,10 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.{ErrorMapping, TopicAndPartition} + import kafka.api.ApiUtils._ -import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response @@ -117,7 +118,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null)) } val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 570b2da..c866180 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -18,11 +18,12 @@ package kafka.api import java.nio._ -import kafka.message._ + import kafka.api.ApiUtils._ import kafka.common._ +import kafka.message._ +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import kafka.network.{RequestChannel, BoundedByteBufferSend} object ProducerRequest { val CurrentVersion = 0.shortValue @@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l)) } val errorResponse = ProducerResponse(correlationId, producerResponseStatus) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/RequestKeys.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index ef7a86e..155cb65 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -20,6 +20,8 @@ package kafka.api import kafka.common.KafkaException import java.nio.ByteBuffer +import kafka.network.InvalidRequestException + object RequestKeys { val ProduceKey: Short = 0 val FetchKey: Short = 1 @@ -59,7 +61,7 @@ object RequestKeys { def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = { keyToNameAndDeserializerMap.get(key) match { case Some(nameAndSerializer) => nameAndSerializer._2 - case None => throw new KafkaException("Wrong request type %d".format(key)) + case None => throw new InvalidRequestException("Wrong request type %d".format(key)) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/StopReplicaRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala index 5e14987..4441fc6 100644 --- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio._ import kafka.api.ApiUtils._ -import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException} +import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestException} import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.network.RequestChannel.Response import kafka.utils.Logging @@ -106,7 +106,7 @@ case class StopReplicaRequest(versionId: Short, case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) }.toMap val errorResponse = StopReplicaResponse(correlationId, responseMap) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/TopicMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 363bae0..401c583 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -18,13 +18,15 @@ package kafka.api import java.nio.ByteBuffer + import kafka.api.ApiUtils._ -import collection.mutable.ListBuffer -import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.common.ErrorMapping +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging +import scala.collection.mutable.ListBuffer + object TopicMetadataRequest extends Logging { val CurrentVersion = 0.shortValue val DefaultClientId = "" @@ -80,7 +82,7 @@ case class TopicMetadataRequest(versionId: Short, topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala index 69f0397..d59de82 100644 --- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala @@ -21,8 +21,8 @@ import java.nio.ByteBuffer import kafka.api.ApiUtils._ import kafka.cluster.{Broker, BrokerEndPoint} import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition} +import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -import kafka.network.{BoundedByteBufferSend, RequestChannel} import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection.Set @@ -128,7 +128,7 @@ case class UpdateMetadataRequest (versionId: Short, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { val errorResponse = new UpdateMetadataResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]])) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } override def describe(details: Boolean): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/client/ClientUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 62394c0..68c7e7f 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -153,7 +153,7 @@ object ClientUtils extends Logging{ debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) queryChannel.send(ConsumerMetadataRequest(group)) val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer) + val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.payload()) debug("Consumer metadata response: " + consumerMetadataResponse.toString) if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) coordinatorOpt = consumerMetadataResponse.coordinatorOpt http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 31a2639..c16f7ed 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -24,6 +24,7 @@ import kafka.api._ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} +import org.apache.kafka.common.network.{NetworkReceive, Receive} import org.apache.kafka.common.utils.Utils._ /** @@ -65,9 +66,9 @@ class SimpleConsumer(val host: String, } } - private def sendRequest(request: RequestOrResponse): Receive = { + private def sendRequest(request: RequestOrResponse): NetworkReceive = { lock synchronized { - var response: Receive = null + var response: NetworkReceive = null try { getOrMakeConnection() blockingChannel.send(request) @@ -94,12 +95,12 @@ class SimpleConsumer(val host: String, def send(request: TopicMetadataRequest): TopicMetadataResponse = { val response = sendRequest(request) - TopicMetadataResponse.readFrom(response.buffer) + TopicMetadataResponse.readFrom(response.payload()) } def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = { val response = sendRequest(request) - ConsumerMetadataResponse.readFrom(response.buffer) + ConsumerMetadataResponse.readFrom(response.payload()) } /** @@ -109,7 +110,7 @@ class SimpleConsumer(val host: String, * @return a set of fetched messages */ def fetch(request: FetchRequest): FetchResponse = { - var response: Receive = null + var response: NetworkReceive = null val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer aggregateTimer.time { @@ -117,7 +118,7 @@ class SimpleConsumer(val host: String, response = sendRequest(request) } } - val fetchResponse = FetchResponse.readFrom(response.buffer) + val fetchResponse = FetchResponse.readFrom(response.payload()) val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) @@ -129,7 +130,7 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetRequest]] object. * @return a [[kafka.api.OffsetResponse]] object. */ - def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) + def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).payload()) /** * Commit offsets for a topic @@ -140,7 +141,7 @@ class SimpleConsumer(val host: String, def commitOffsets(request: OffsetCommitRequest) = { // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before // we can commit offsets. - OffsetCommitResponse.readFrom(sendRequest(request).buffer) + OffsetCommitResponse.readFrom(sendRequest(request).payload()) } /** @@ -149,7 +150,7 @@ class SimpleConsumer(val host: String, * @param request a [[kafka.api.OffsetFetchRequest]] object. * @return a [[kafka.api.OffsetFetchResponse]] object. */ - def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer) + def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload()) private def getOrMakeConnection() { if(!isClosed && !blockingChannel.isConnected) { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index a7f2acc..e42d104 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -334,7 +334,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { kafkaCommitMeter.mark(offsetsToCommit.size) offsetsChannel.send(offsetCommitRequest) - val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) + val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload()) trace("Offset commit response: %s.".format(offsetCommitResponse)) val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { @@ -421,7 +421,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, ensureOffsetManagerConnected() try { offsetsChannel.send(offsetFetchRequest) - val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().payload()) trace("Offset fetch response: %s.".format(offsetFetchResponse)) val (leaderChanged, loadInProgress) = http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 6cf13f0..9f521fa 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -16,8 +16,9 @@ */ package kafka.controller -import kafka.network.{Receive, BlockingChannel} +import kafka.network.BlockingChannel import kafka.utils.{CoreUtils, Logging, ShutdownableThread} +import org.apache.kafka.common.network.NetworkReceive import collection.mutable.HashMap import kafka.cluster.Broker import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} @@ -120,7 +121,7 @@ class RequestSendThread(val controllerId: Int, val queueItem = queue.take() val request = queueItem._1 val callback = queueItem._2 - var receive: Receive = null + var receive: NetworkReceive = null try { lock synchronized { var isSendSuccessful = false @@ -147,11 +148,11 @@ class RequestSendThread(val controllerId: Int, var response: RequestOrResponse = null request.requestId.get match { case RequestKeys.LeaderAndIsrKey => - response = LeaderAndIsrResponse.readFrom(receive.buffer) + response = LeaderAndIsrResponse.readFrom(receive.payload()) case RequestKeys.StopReplicaKey => - response = StopReplicaResponse.readFrom(receive.buffer) + response = StopReplicaResponse.readFrom(receive.payload()) case RequestKeys.UpdateMetadataKey => - response = UpdateMetadataResponse.readFrom(receive.buffer) + response = UpdateMetadataResponse.readFrom(receive.payload()) } stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s" .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString)) http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index b0b7be1..568d0ac 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -16,12 +16,11 @@ */ package kafka.javaapi -import kafka.api._ import java.nio.ByteBuffer + +import kafka.api._ + import scala.collection.mutable -import kafka.network.{BoundedByteBufferSend, RequestChannel} -import kafka.common.ErrorMapping -import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, val correlationId: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/BlockingChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 6e2a38e..1197259 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -19,8 +19,10 @@ package kafka.network import java.net.InetSocketAddress import java.nio.channels._ -import kafka.utils.{nonthreadsafe, Logging} + import kafka.api.RequestOrResponse +import kafka.utils.{Logging, nonthreadsafe} +import org.apache.kafka.common.network.NetworkReceive object BlockingChannel{ @@ -43,6 +45,7 @@ class BlockingChannel( val host: String, private var writeChannel: GatheringByteChannel = null private val lock = new Object() private val connectTimeoutMs = readTimeoutMs + private var connectionId: String = "" def connect() = lock synchronized { if(!connected) { @@ -59,8 +62,15 @@ class BlockingChannel( val host: String, channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) writeChannel = channel + // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout + // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work readChannel = Channels.newChannel(channel.socket().getInputStream) connected = true + val localHost = channel.socket.getLocalAddress.getHostAddress + val localPort = channel.socket.getLocalPort + val remoteHost = channel.socket.getInetAddress.getHostAddress + val remotePort = channel.socket.getPort + connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort // settings may not match what we requested above val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d." debug(msg.format(channel.socket.getSoTimeout, @@ -95,20 +105,21 @@ class BlockingChannel( val host: String, def isConnected = connected - def send(request: RequestOrResponse):Int = { + def send(request: RequestOrResponse): Long = { if(!connected) throw new ClosedChannelException() - val send = new BoundedByteBufferSend(request) + val send = new RequestOrResponseSend(connectionId, request) send.writeCompletely(writeChannel) } - def receive(): Receive = { + def receive(): NetworkReceive = { if(!connected) throw new ClosedChannelException() - val response = new BoundedByteBufferReceive() + val response = new NetworkReceive() response.readCompletely(readChannel) + response.payload().rewind() response }