Repository: kafka
Updated Branches:
  refs/heads/1.0 305e7949c -> 46fc8eca9


KAFKA-6289; NetworkClient should not expose failed internal ApiVersions requests

The NetworkClient internally ApiVersion requests to each broker following 
connection establishment. If this request happens to fail (perhaps due to an 
incompatible broker), the NetworkClient includes the response in the result of 
poll(). Applications will generally not be expecting this response which may 
lead to failed assertions (or in the case of AdminClient, an obscure log 
message).

I've added test cases which await the ApiVersion request sent by NetworkClient 
to be in-flight, and then disconnect the connection and verify that the 
response is not included from poll().

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>

Closes #4280 from hachikuji/KAFKA-6289


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46fc8eca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46fc8eca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46fc8eca

Branch: refs/heads/1.0
Commit: 46fc8eca933cdbce705474f595ef7d67e59364b3
Parents: 305e794
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Dec 8 10:54:31 2017 +0000
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Fri Dec 8 10:59:24 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 10 ++--
 .../apache/kafka/clients/NetworkClientTest.java | 60 ++++++++++++++++----
 .../org/apache/kafka/test/MockSelector.java     | 25 +++++++-
 3 files changed, 78 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index ee7258a..ea4eacc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -618,12 +618,12 @@ public class NetworkClient implements KafkaClient {
                 break; // Disconnections in other states are logged at debug 
level in Selector
         }
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) 
{
-            log.trace("Cancelled request {} with correlation id {} due to node 
{} being disconnected", request.request,
-                    request.header.correlationId(), nodeId);
-            if (request.isInternalRequest && request.header.apiKey() == 
ApiKeys.METADATA)
-                metadataUpdater.handleDisconnection(request.destination);
-            else
+            log.trace("Cancelled request {} {} with correlation id {} due to 
node {} being disconnected",
+                    request.header.apiKey(), request.request, 
request.header.correlationId(), nodeId);
+            if (!request.isInternalRequest)
                 responses.add(request.disconnected(now));
+            else if (request.header.apiKey() == ApiKeys.METADATA)
+                metadataUpdater.handleDisconnection(request.destination);
         }
         AuthenticationException authenticationException = 
connectionStates.authenticationException(nodeId);
         if (authenticationException != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index edbd72d..0bf0a69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -159,7 +160,7 @@ public class NetworkClientTest {
                 request.correlationId(), 
handler.response.requestHeader().correlationId());
     }
 
-    private void maybeSetExpectedApiVersionsResponse() {
+    private void setExpectedApiVersionsResponse() {
         ApiVersionsResponse response = 
ApiVersionsResponse.defaultApiVersionsResponse();
         short apiVersionsResponseVersion = 
response.apiVersion(ApiKeys.API_VERSIONS.id).maxVersion;
         ByteBuffer buffer = response.serialize(apiVersionsResponseVersion, new 
ResponseHeader(0));
@@ -168,7 +169,7 @@ public class NetworkClientTest {
 
     private void awaitReady(NetworkClient client, Node node) {
         if (client.discoverBrokerVersions()) {
-            maybeSetExpectedApiVersionsResponse();
+            setExpectedApiVersionsResponse();
         }
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
@@ -185,11 +186,15 @@ public class NetworkClientTest {
         ClientRequest request = client.newClientRequest(
                 node.idString(), builder, now, true, handler);
         client.send(request, now);
+
         // sleeping to make sure that the time since last send is greater than 
requestTimeOut
         time.sleep(3000);
-        client.poll(3000, time.milliseconds());
-        assertEquals(1, selector.disconnected().size());
-        assertTrue("Node not found in disconnected map", 
selector.disconnected().containsKey(node.idString()));
+        List<ClientResponse> responses = client.poll(3000, 
time.milliseconds());
+
+        assertEquals(1, responses.size());
+        ClientResponse clientResponse = responses.get(0);
+        assertEquals(node.idString(), clientResponse.destination());
+        assertTrue("Expected response to fail due to disconnection", 
clientResponse.wasDisconnected());
     }
 
     @Test
@@ -207,13 +212,12 @@ public class NetworkClientTest {
         time.sleep(reconnectBackoffMsTest);
         
         // CLOSE node 
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         
         client.poll(1, time.milliseconds());
         assertFalse("After we forced the disconnection the client is no longer 
ready.", client.ready(node, time.milliseconds()));
         leastNode = client.leastLoadedNode(time.milliseconds());
         assertEquals("There should be NO leastloadednode", leastNode, null);
-        
     }
 
     @Test
@@ -238,7 +242,7 @@ public class NetworkClientTest {
     public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
         awaitReady(clientWithNoExponentialBackoff, node);
 
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         clientWithNoExponentialBackoff.poll(requestTimeoutMs, 
time.milliseconds());
         long delay = clientWithNoExponentialBackoff.connectionDelay(node, 
time.milliseconds());
 
@@ -250,7 +254,7 @@ public class NetworkClientTest {
 
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         client.poll(requestTimeoutMs, time.milliseconds());
 
         // Second attempt should have the same behaviour as exponential 
backoff is disabled
@@ -280,7 +284,7 @@ public class NetworkClientTest {
         awaitReady(client, node);
 
         // First disconnection
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         client.poll(requestTimeoutMs, time.milliseconds());
         long delay = client.connectionDelay(node, time.milliseconds());
         long expectedDelay = reconnectBackoffMsTest;
@@ -293,7 +297,7 @@ public class NetworkClientTest {
 
         // Start connecting and disconnect before the connection is established
         client.ready(node, time.milliseconds());
-        selector.close(node.idString());
+        selector.serverDisconnect(node.idString());
         client.poll(requestTimeoutMs, time.milliseconds());
 
         // Second attempt should take twice as long with twice the jitter
@@ -325,6 +329,28 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testServerDisconnectAfterInternalApiVersionRequest() throws 
Exception {
+        awaitInFlightApiVersionRequest();
+        selector.serverDisconnect(node.idString());
+
+        // The failed ApiVersion request should not be forwarded to upper 
layers
+        List<ClientResponse> responses = client.poll(0, time.milliseconds());
+        assertFalse(client.hasInFlightRequests(node.idString()));
+        assertTrue(responses.isEmpty());
+    }
+
+    @Test
+    public void testClientDisconnectAfterInternalApiVersionRequest() throws 
Exception {
+        awaitInFlightApiVersionRequest();
+        client.disconnect(node.idString());
+        assertFalse(client.hasInFlightRequests(node.idString()));
+
+        // The failed ApiVersion request should not be forwarded to upper 
layers
+        List<ClientResponse> responses = client.poll(0, time.milliseconds());
+        assertTrue(responses.isEmpty());
+    }
+
+    @Test
     public void testCallDisconnect() throws Exception {
         awaitReady(client, node);
         assertTrue("Expected NetworkClient to be ready to send to node " + 
node.idString(),
@@ -345,6 +371,18 @@ public class NetworkClientTest {
         assertTrue(client.canConnect(node, time.milliseconds()));
     }
 
+    private void awaitInFlightApiVersionRequest() throws Exception {
+        client.ready(node, time.milliseconds());
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                client.poll(0, time.milliseconds());
+                return client.hasInFlightRequests(node.idString());
+            }
+        }, 1000, "");
+        assertFalse(client.isReady(node, time.milliseconds()));
+    }
+
     private static class TestCallbackHandler implements 
RequestCompletionHandler {
         public boolean executed = false;
         public ClientResponse response;

http://git-wip-us.apache.org/repos/asf/kafka/blob/46fc8eca/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java 
b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 225aba4..6fc1b1b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -63,7 +64,11 @@ public class MockSelector implements Selectable {
 
     @Override
     public void close(String id) {
-        this.disconnected.put(id, ChannelState.LOCAL_CLOSE);
+        // Note that there are no notifications for client-side disconnects
+
+        removeSendsForNode(id, completedSends);
+        removeSendsForNode(id, initiatedSends);
+
         for (int i = 0; i < this.connected.size(); i++) {
             if (this.connected.get(i).equals(id)) {
                 this.connected.remove(i);
@@ -72,6 +77,24 @@ public class MockSelector implements Selectable {
         }
     }
 
+    /**
+     * Simulate a server disconnect. This id will be present in {@link 
#disconnected()} on
+     * the next {@link #poll(long)}.
+     */
+    public void serverDisconnect(String id) {
+        this.disconnected.put(id, ChannelState.READY);
+        close(id);
+    }
+
+    private void removeSendsForNode(String id, Collection<Send> sends) {
+        Iterator<Send> iter = sends.iterator();
+        while (iter.hasNext()) {
+            Send send = iter.next();
+            if (id.equals(send.destination()))
+                iter.remove();
+        }
+    }
+
     public void clear() {
         this.completedSends.clear();
         this.completedReceives.clear();

Reply via email to