Repository: kafka Updated Branches: refs/heads/trunk 05d00b5ac -> 24fd025d4
KAFKA-3916; Check for disconnects properly before sending from the controller Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1734 from hachikuji/KAFKA-3916 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24fd025d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24fd025d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24fd025d Branch: refs/heads/trunk Commit: 24fd025d407d42176847143d3a0dc416a75d8f35 Parents: 05d00b5 Author: Jason Gustafson <[email protected]> Authored: Tue Aug 23 02:55:17 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Aug 23 02:59:30 2016 +0100 ---------------------------------------------------------------------- .../apache/kafka/common/network/Selector.java | 125 +++++++++++++------ .../controller/ControllerChannelManager.scala | 19 ++- .../kafka/utils/NetworkClientBlockingOps.scala | 58 ++++----- 3 files changed, 120 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/24fd025d/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index ab9dab9..5244710 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory; */ public class Selector implements Selectable { + public static final long NO_IDLE_TIMEOUT_MS = -1; private static final Logger log = LoggerFactory.getLogger(Selector.class); private final java.nio.channels.Selector nioSelector; @@ -93,25 +94,36 @@ public class Selector implements Selectable { private final String metricGrpPrefix; private final Map<String, String> metricTags; private final ChannelBuilder channelBuilder; - private final Map<String, Long> lruConnections; - private final long connectionsMaxIdleNanos; private final int maxReceiveSize; private final boolean metricsPerConnection; - private long currentTimeNanos; - private long nextIdleCloseCheckTime; - + private final IdleExpiryManager idleExpiryManager; /** * Create a new nioSelector + * + * @param maxReceiveSize Max size in bytes of a single network receive (use {@link NetworkReceive#UNLIMITED} for no limit) + * @param connectionMaxIdleMs Max idle connection time (use {@link #NO_IDLE_TIMEOUT_MS} to disable idle timeout) + * @param metrics Registry for Selector metrics + * @param time Time implementation + * @param metricGrpPrefix Prefix for the group of metrics registered by Selector + * @param metricTags Additional tags to add to metrics registered by Selector + * @param metricsPerConnection Whether or not to enable per-connection metrics + * @param channelBuilder Channel builder for every new connection */ - public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) { + public Selector(int maxReceiveSize, + long connectionMaxIdleMs, + Metrics metrics, + Time time, + String metricGrpPrefix, + Map<String, String> metricTags, + boolean metricsPerConnection, + ChannelBuilder channelBuilder) { try { this.nioSelector = java.nio.channels.Selector.open(); } catch (IOException e) { throw new KafkaException(e); } this.maxReceiveSize = maxReceiveSize; - this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000; this.time = time; this.metricGrpPrefix = metricGrpPrefix; this.metricTags = metricTags; @@ -125,11 +137,8 @@ public class Selector implements Selectable { this.failedSends = new ArrayList<>(); this.sensors = new SelectorMetrics(metrics); this.channelBuilder = channelBuilder; - // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true - this.lruConnections = new LinkedHashMap<>(16, .75F, true); - currentTimeNanos = time.nanoseconds(); - nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; this.metricsPerConnection = metricsPerConnection; + this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs); } public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) { @@ -276,22 +285,26 @@ public class Selector implements Selectable { long startSelect = time.nanoseconds(); int readyKeys = select(timeout); long endSelect = time.nanoseconds(); - currentTimeNanos = endSelect; this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { - pollSelectionKeys(this.nioSelector.selectedKeys(), false); - pollSelectionKeys(immediatelyConnectedKeys, true); + pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); + pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); - maybeCloseOldestConnection(); + + // we use the time at the end of select to ensure that we don't close any connections that + // have just been processed in pollSelectionKeys + maybeCloseOldestConnection(endSelect); } - private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) { + private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, + boolean isImmediatelyConnected, + long currentTimeNanos) { Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); @@ -300,7 +313,8 @@ public class Selector implements Selectable { // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); - lruConnections.put(channel.id(), currentTimeNanos); + if (idleExpiryManager != null) + idleExpiryManager.update(channel.id(), currentTimeNanos); try { @@ -409,24 +423,20 @@ public class Selector implements Selectable { unmute(channel); } - private void maybeCloseOldestConnection() { - if (currentTimeNanos > nextIdleCloseCheckTime) { - if (lruConnections.isEmpty()) { - nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; - } else { - Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); - Long connectionLastActiveTime = oldestConnectionEntry.getValue(); - nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; - if (currentTimeNanos > nextIdleCloseCheckTime) { - String connectionId = oldestConnectionEntry.getKey(); - if (log.isTraceEnabled()) - log.trace("About to close the idle connection from " + connectionId - + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); - - disconnected.add(connectionId); - close(connectionId); - } - } + private void maybeCloseOldestConnection(long currentTimeNanos) { + if (idleExpiryManager == null) + return; + + Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos); + if (expiredConnection != null) { + String connectionId = expiredConnection.getKey(); + + if (log.isTraceEnabled()) + log.trace("About to close the idle connection from {} due to being idle for {} millis", + connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); + + disconnected.add(connectionId); + close(connectionId); } } @@ -480,8 +490,10 @@ public class Selector implements Selectable { } this.stagedReceives.remove(channel); this.channels.remove(channel.id()); - this.lruConnections.remove(channel.id()); this.sensors.connectionClosed.record(); + + if (idleExpiryManager != null) + idleExpiryManager.remove(channel.id()); } @@ -726,4 +738,45 @@ public class Selector implements Selectable { } } + // helper class for tracking least recently used connections to enable idle connection closing + private static class IdleExpiryManager { + private final Map<String, Long> lruConnections; + private final long connectionsMaxIdleNanos; + private long nextIdleCloseCheckTime; + + public IdleExpiryManager(Time time, long connectionsMaxIdleMs) { + this.connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000; + // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true + this.lruConnections = new LinkedHashMap<>(16, .75F, true); + this.nextIdleCloseCheckTime = time.nanoseconds() + this.connectionsMaxIdleNanos; + } + + public void update(String connectionId, long currentTimeNanos) { + lruConnections.put(connectionId, currentTimeNanos); + } + + public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) { + if (currentTimeNanos <= nextIdleCloseCheckTime) + return null; + + if (lruConnections.isEmpty()) { + nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos; + return null; + } + + Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next(); + Long connectionLastActiveTime = oldestConnectionEntry.getValue(); + nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos; + + if (currentTimeNanos > nextIdleCloseCheckTime) + return oldestConnectionEntry; + else + return null; + } + + public void remove(String connectionId) { + lruConnections.remove(connectionId); + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/24fd025d/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 3007004..c46a536 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -100,7 +100,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf ) val selector = new Selector( NetworkReceive.UNLIMITED, - config.connectionsMaxIdleMs, + Selector.NO_IDLE_TIMEOUT_MS, metrics, time, "controller-channel", @@ -167,7 +167,7 @@ class RequestSendThread(val controllerId: Int, override def doWork(): Unit = { - def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300)) + def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100)) val QueueItem(apiKey, apiVersion, request, callback) = queue.take() import NetworkClientBlockingOps._ @@ -226,18 +226,13 @@ class RequestSendThread(val controllerId: Int, private def brokerReady(): Boolean = { import NetworkClientBlockingOps._ try { + val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time) - if (networkClient.isReady(brokerNode, time.milliseconds())) - true - else { - val ready = networkClient.blockingReady(brokerNode, socketTimeoutMs)(time) + if (!ready) + throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") - if (!ready) - throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") - - info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString())) - true - } + info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString())) + true } catch { case e: Throwable => warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString()), e) http://git-wip-us.apache.org/repos/asf/kafka/blob/24fd025d/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala index fd4af6e..9aca663 100644 --- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala +++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala @@ -45,24 +45,42 @@ object NetworkClientBlockingOps { class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { /** - * Invokes `client.ready` followed by 0 or more `client.poll` invocations until the connection to `node` is ready, - * the timeout expires or the connection fails. + * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll` + * invocations until the connection to `node` is ready, the timeout expires or the connection fails. * * It returns `true` if the call completes normally or `false` if the timeout expires. If the connection fails, - * an `IOException` is thrown instead. + * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive + * connection timeout, it is possible for this method to raise an `IOException` for a previous connection which + * has recently disconnected. * * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with * care. */ def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = { require(timeout >=0, "timeout should be >= 0") - client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) => - if (client.isReady(node, now)) + + val startTime = time.milliseconds() + val expiryTime = startTime + timeout + + @tailrec + def awaitReady(iterationStartTime: Long): Boolean = { + if (client.isReady(node, iterationStartTime)) true else if (client.connectionFailed(node)) throw new IOException(s"Connection to $node failed") - else false + else { + val pollTimeout = expiryTime - iterationStartTime + client.poll(pollTimeout, iterationStartTime) + val afterPollTime = time.milliseconds() + if (afterPollTime < expiryTime) awaitReady(afterPollTime) + else false + } } + + // poll once to receive pending disconnects + client.poll(0, startTime) + + client.ready(node, startTime) || awaitReady(startTime) } /** @@ -93,34 +111,6 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal { } /** - * Invokes `client.poll` until `predicate` returns `true` or the timeout expires. - * - * It returns `true` if the call completes normally or `false` if the timeout expires. Exceptions thrown via - * `predicate` are not handled and will bubble up. - * - * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with - * care. - */ - private def pollUntil(timeout: Long)(predicate: (Seq[ClientResponse], Long) => Boolean)(implicit time: JTime): Boolean = { - val methodStartTime = time.milliseconds() - val timeoutExpiryTime = methodStartTime + timeout - - @tailrec - def recursivePoll(iterationStartTime: Long): Boolean = { - val pollTimeout = timeoutExpiryTime - iterationStartTime - val responses = client.poll(pollTimeout, iterationStartTime).asScala - if (predicate(responses, iterationStartTime)) true - else { - val afterPollTime = time.milliseconds() - if (afterPollTime < timeoutExpiryTime) recursivePoll(afterPollTime) - else false - } - } - - recursivePoll(methodStartTime) - } - - /** * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned. * * Exceptions thrown via `collect` are not handled and will bubble up.
