Repository: kafka Updated Branches: refs/heads/1.0 305e7949c -> 46fc8eca9
KAFKA-6289; NetworkClient should not expose failed internal ApiVersions requests The NetworkClient internally ApiVersion requests to each broker following connection establishment. If this request happens to fail (perhaps due to an incompatible broker), the NetworkClient includes the response in the result of poll(). Applications will generally not be expecting this response which may lead to failed assertions (or in the case of AdminClient, an obscure log message). I've added test cases which await the ApiVersion request sent by NetworkClient to be in-flight, and then disconnect the connection and verify that the response is not included from poll(). Author: Jason Gustafson <ja...@confluent.io> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> Closes #4280 from hachikuji/KAFKA-6289 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46fc8eca Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46fc8eca Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46fc8eca Branch: refs/heads/1.0 Commit: 46fc8eca933cdbce705474f595ef7d67e59364b3 Parents: 305e794 Author: Jason Gustafson <ja...@confluent.io> Authored: Fri Dec 8 10:54:31 2017 +0000 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Fri Dec 8 10:59:24 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 10 ++-- .../apache/kafka/clients/NetworkClientTest.java | 60 ++++++++++++++++---- .../org/apache/kafka/test/MockSelector.java | 25 +++++++- 3 files changed, 78 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index ee7258a..ea4eacc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -618,12 +618,12 @@ public class NetworkClient implements KafkaClient { break; // Disconnections in other states are logged at debug level in Selector } for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) { - log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected", request.request, - request.header.correlationId(), nodeId); - if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA) - metadataUpdater.handleDisconnection(request.destination); - else + log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected", + request.header.apiKey(), request.request, request.header.correlationId(), nodeId); + if (!request.isInternalRequest) responses.add(request.disconnected(now)); + else if (request.header.apiKey() == ApiKeys.METADATA) + metadataUpdater.handleDisconnection(request.destination); } AuthenticationException authenticationException = connectionStates.authenticationException(nodeId); if (authenticationException != null) http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index edbd72d..0bf0a69 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.DelayedReceive; import org.apache.kafka.test.MockSelector; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; @@ -159,7 +160,7 @@ public class NetworkClientTest { request.correlationId(), handler.response.requestHeader().correlationId()); } - private void maybeSetExpectedApiVersionsResponse() { + private void setExpectedApiVersionsResponse() { ApiVersionsResponse response = ApiVersionsResponse.defaultApiVersionsResponse(); short apiVersionsResponseVersion = response.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion; ByteBuffer buffer = response.serialize(apiVersionsResponseVersion, new ResponseHeader(0)); @@ -168,7 +169,7 @@ public class NetworkClientTest { private void awaitReady(NetworkClient client, Node node) { if (client.discoverBrokerVersions()) { - maybeSetExpectedApiVersionsResponse(); + setExpectedApiVersionsResponse(); } while (!client.ready(node, time.milliseconds())) client.poll(1, time.milliseconds()); @@ -185,11 +186,15 @@ public class NetworkClientTest { ClientRequest request = client.newClientRequest( node.idString(), builder, now, true, handler); client.send(request, now); + // sleeping to make sure that the time since last send is greater than requestTimeOut time.sleep(3000); - client.poll(3000, time.milliseconds()); - assertEquals(1, selector.disconnected().size()); - assertTrue("Node not found in disconnected map", selector.disconnected().containsKey(node.idString())); + List<ClientResponse> responses = client.poll(3000, time.milliseconds()); + + assertEquals(1, responses.size()); + ClientResponse clientResponse = responses.get(0); + assertEquals(node.idString(), clientResponse.destination()); + assertTrue("Expected response to fail due to disconnection", clientResponse.wasDisconnected()); } @Test @@ -207,13 +212,12 @@ public class NetworkClientTest { time.sleep(reconnectBackoffMsTest); // CLOSE node - selector.close(node.idString()); + selector.serverDisconnect(node.idString()); client.poll(1, time.milliseconds()); assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); leastNode = client.leastLoadedNode(time.milliseconds()); assertEquals("There should be NO leastloadednode", leastNode, null); - } @Test @@ -238,7 +242,7 @@ public class NetworkClientTest { public void testConnectionDelayDisconnectedWithNoExponentialBackoff() { awaitReady(clientWithNoExponentialBackoff, node); - selector.close(node.idString()); + selector.serverDisconnect(node.idString()); clientWithNoExponentialBackoff.poll(requestTimeoutMs, time.milliseconds()); long delay = clientWithNoExponentialBackoff.connectionDelay(node, time.milliseconds()); @@ -250,7 +254,7 @@ public class NetworkClientTest { // Start connecting and disconnect before the connection is established client.ready(node, time.milliseconds()); - selector.close(node.idString()); + selector.serverDisconnect(node.idString()); client.poll(requestTimeoutMs, time.milliseconds()); // Second attempt should have the same behaviour as exponential backoff is disabled @@ -280,7 +284,7 @@ public class NetworkClientTest { awaitReady(client, node); // First disconnection - selector.close(node.idString()); + selector.serverDisconnect(node.idString()); client.poll(requestTimeoutMs, time.milliseconds()); long delay = client.connectionDelay(node, time.milliseconds()); long expectedDelay = reconnectBackoffMsTest; @@ -293,7 +297,7 @@ public class NetworkClientTest { // Start connecting and disconnect before the connection is established client.ready(node, time.milliseconds()); - selector.close(node.idString()); + selector.serverDisconnect(node.idString()); client.poll(requestTimeoutMs, time.milliseconds()); // Second attempt should take twice as long with twice the jitter @@ -325,6 +329,28 @@ public class NetworkClientTest { } @Test + public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception { + awaitInFlightApiVersionRequest(); + selector.serverDisconnect(node.idString()); + + // The failed ApiVersion request should not be forwarded to upper layers + List<ClientResponse> responses = client.poll(0, time.milliseconds()); + assertFalse(client.hasInFlightRequests(node.idString())); + assertTrue(responses.isEmpty()); + } + + @Test + public void testClientDisconnectAfterInternalApiVersionRequest() throws Exception { + awaitInFlightApiVersionRequest(); + client.disconnect(node.idString()); + assertFalse(client.hasInFlightRequests(node.idString())); + + // The failed ApiVersion request should not be forwarded to upper layers + List<ClientResponse> responses = client.poll(0, time.milliseconds()); + assertTrue(responses.isEmpty()); + } + + @Test public void testCallDisconnect() throws Exception { awaitReady(client, node); assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(), @@ -345,6 +371,18 @@ public class NetworkClientTest { assertTrue(client.canConnect(node, time.milliseconds())); } + private void awaitInFlightApiVersionRequest() throws Exception { + client.ready(node, time.milliseconds()); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + client.poll(0, time.milliseconds()); + return client.hasInFlightRequests(node.idString()); + } + }, 1000, ""); + assertFalse(client.isReady(node, time.milliseconds())); + } + private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false; public ClientResponse response; http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/test/java/org/apache/kafka/test/MockSelector.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index 225aba4..6fc1b1b 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -19,6 +19,7 @@ package org.apache.kafka.test; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -63,7 +64,11 @@ public class MockSelector implements Selectable { @Override public void close(String id) { - this.disconnected.put(id, ChannelState.LOCAL_CLOSE); + // Note that there are no notifications for client-side disconnects + + removeSendsForNode(id, completedSends); + removeSendsForNode(id, initiatedSends); + for (int i = 0; i < this.connected.size(); i++) { if (this.connected.get(i).equals(id)) { this.connected.remove(i); @@ -72,6 +77,24 @@ public class MockSelector implements Selectable { } } + /** + * Simulate a server disconnect. This id will be present in {@link #disconnected()} on + * the next {@link #poll(long)}. + */ + public void serverDisconnect(String id) { + this.disconnected.put(id, ChannelState.READY); + close(id); + } + + private void removeSendsForNode(String id, Collection<Send> sends) { + Iterator<Send> iter = sends.iterator(); + while (iter.hasNext()) { + Send send = iter.next(); + if (id.equals(send.destination())) + iter.remove(); + } + } + public void clear() { this.completedSends.clear(); this.completedReceives.clear();