This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new ede401b Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305) ede401b is described below commit ede401b86eaa7688cf7e922b79218c66638833f4 Author: Nicholas Parker <n...@thelastpickle.com> AuthorDate: Thu Mar 7 21:07:23 2019 +1300 Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305) * Fix for KAFKA-7974: Avoid calling disconnect() when not connecting * Resolve host only when currentAddress() is called Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state. Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed. * Add Javadoc to ClusterConnectionStates.connecting() --- .../kafka/clients/ClusterConnectionStates.java | 68 +++++++++++++++------- .../org/apache/kafka/clients/NetworkClient.java | 10 ++-- .../kafka/clients/ClusterConnectionStatesTest.java | 14 ++--- .../apache/kafka/clients/NetworkClientTest.java | 8 ++- 4 files changed, 66 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 376b35d..e9bd971 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -108,16 +109,18 @@ final class ClusterConnectionStates { } /** - * Enter the connecting state for the given connection. + * Enter the connecting state for the given connection, moving to a new resolved address if necessary. * @param id the id of the connection - * @param now the current time - * @throws UnknownHostException + * @param now the current time in ms + * @param host the host of the connection, to be resolved internally if needed + * @param clientDnsLookup the mode of DNS lookup to use when resolving the {@code host} */ - public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException { + public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { NodeConnectionState connectionState = nodeState.get(id); if (connectionState != null && connectionState.host().equals(host)) { connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; + // Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); return; } else if (connectionState != null) { @@ -130,14 +133,19 @@ final class ClusterConnectionStates { this.reconnectBackoffInitMs, host, clientDnsLookup)); } - public InetAddress currentAddress(String id) { - return nodeState.get(id).currentAddress(); + /** + * Returns a resolved address for the given connection, resolving it if necessary. + * @param id the id of the connection + * @throws UnknownHostException if the address was not resolvable + */ + public InetAddress currentAddress(String id) throws UnknownHostException { + return nodeState(id).currentAddress(); } /** * Enter the disconnected state for the given node. * @param id the connection we have disconnected - * @param now the current time + * @param now the current time in ms */ public void disconnected(String id, long now) { NodeConnectionState nodeState = nodeState(id); @@ -212,7 +220,7 @@ final class ClusterConnectionStates { /** * Enter the authentication failed state for the given node. * @param id the connection identifier - * @param now the current time + * @param now the current time in ms * @param exception the authentication exception */ public void authenticationFailed(String id, long now, AuthenticationException exception) { @@ -227,7 +235,7 @@ final class ClusterConnectionStates { * Return true if the connection is in the READY state and currently not throttled. * * @param id the connection identifier - * @param now the current time + * @param now the current time in ms */ public boolean isReady(String id, long now) { return isReady(nodeState.get(id), now); @@ -241,7 +249,7 @@ final class ClusterConnectionStates { * Return true if there is at least one node with connection in the READY state and not throttled. Returns false * otherwise. * - * @param now the current time + * @param now the current time in ms */ public boolean hasReadyNodes(long now) { for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) { @@ -353,14 +361,15 @@ final class ClusterConnectionStates { // Connection is being throttled if current time < throttleUntilTimeMs. long throttleUntilTimeMs; private List<InetAddress> addresses; - private int index = 0; + private int addressIndex; private final String host; private final ClientDnsLookup clientDnsLookup; - public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs, - String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException { + private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs, + String host, ClientDnsLookup clientDnsLookup) { this.state = state; - this.addresses = ClientUtils.resolve(host, clientDnsLookup); + this.addresses = Collections.emptyList(); + this.addressIndex = -1; this.authenticationException = null; this.lastConnectAttemptMs = lastConnectAttempt; this.failedAttempts = 0; @@ -374,17 +383,32 @@ final class ClusterConnectionStates { return host; } - public InetAddress currentAddress() { - return addresses.get(index); + /** + * Fetches the current selected IP address for this node, resolving {@link #host()} if necessary. + * @return the selected address + * @throws UnknownHostException if resolving {@link #host()} fails + */ + private InetAddress currentAddress() throws UnknownHostException { + if (addresses.isEmpty()) { + // (Re-)initialize list + addresses = ClientUtils.resolve(host, clientDnsLookup); + addressIndex = 0; + } + + return addresses.get(addressIndex); } - /* - * implementing a ring buffer with the addresses + /** + * Jumps to the next available resolved address for this node. If no other addresses are available, marks the + * list to be refreshed on the next {@link #currentAddress()} call. */ - public void moveToNextAddress() throws UnknownHostException { - index = (index + 1) % addresses.size(); - if (index == 0) - addresses = ClientUtils.resolve(host, clientDnsLookup); + private void moveToNextAddress() { + if (addresses.isEmpty()) + return; // Avoid div0. List will initialize on next currentAddress() call + + addressIndex = (addressIndex + 1) % addresses.size(); + if (addressIndex == 0) + addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call } public String toString() { diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 4bef6a0..77f577a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -696,7 +696,7 @@ public class NetworkClient implements KafkaClient { * @param responses The list of responses to update * @param nodeId Id of the node to be disconnected * @param now The current time - * @param disconnectState The state of the disconnected channel + * @param disconnectState The state of the disconnected channel */ private void processDisconnection(List<ClientResponse> responses, String nodeId, @@ -905,23 +905,25 @@ public class NetworkClient implements KafkaClient { /** * Initiate a connection to the given node + * @param node the node to connect to + * @param now current time in epoch milliseconds */ private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { - this.connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup); - InetAddress address = this.connectionStates.currentAddress(nodeConnectionId); + connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup); + InetAddress address = connectionStates.currentAddress(nodeConnectionId); log.debug("Initiating connection to node {} using address {}", node, address); selector.connect(nodeConnectionId, new InetSocketAddress(address, node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { + log.warn("Error connecting to node {}", node, e); /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); - log.warn("Error connecting to node {}", node, e); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java index 79afb75..19b701d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -53,7 +53,7 @@ public class ClusterConnectionStatesTest { } @Test - public void testClusterConnectionStateChanges() throws UnknownHostException { + public void testClusterConnectionStateChanges() { assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); // Start connecting to Node and check state @@ -97,7 +97,7 @@ public class ClusterConnectionStatesTest { } @Test - public void testMultipleNodeConnectionStates() throws UnknownHostException { + public void testMultipleNodeConnectionStates() { // Check initial state, allowed to connect to all nodes, but no nodes shown as ready assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds())); @@ -135,7 +135,7 @@ public class ClusterConnectionStatesTest { } @Test - public void testAuthorizationFailed() throws UnknownHostException { + public void testAuthorizationFailed() { // Try connecting connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); @@ -156,7 +156,7 @@ public class ClusterConnectionStatesTest { } @Test - public void testRemoveNode() throws UnknownHostException { + public void testRemoveNode() { connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); time.sleep(1000); connectionStates.ready(nodeId1); @@ -171,7 +171,7 @@ public class ClusterConnectionStatesTest { } @Test - public void testMaxReconnectBackoff() throws UnknownHostException { + public void testMaxReconnectBackoff() { long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter)); connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); time.sleep(1000); @@ -191,7 +191,7 @@ public class ClusterConnectionStatesTest { } @Test - public void testExponentialReconnectBackoff() throws UnknownHostException { + public void testExponentialReconnectBackoff() { // Calculate fixed components for backoff process final int reconnectBackoffExpBase = 2; double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1)) @@ -211,7 +211,7 @@ public class ClusterConnectionStatesTest { } @Test - public void testThrottled() throws UnknownHostException { + public void testThrottled() { connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT); time.sleep(1000); connectionStates.ready(nodeId1); diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 8abe9a40..6d74826 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -82,7 +82,7 @@ public class NetworkClientTest { private NetworkClient createNetworkClientWithNoVersionDiscovery() { return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, - 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, + 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext()); } @@ -117,6 +117,12 @@ public class NetworkClientTest { } @Test + public void testDnsLookupFailure() { + /* Fail cleanly when the node has a bad hostname */ + assertFalse(client.ready(new Node(1234, "badhost", 1234), time.milliseconds())); + } + + @Test public void testClose() { client.ready(node, time.milliseconds()); awaitReady(client, node);