Repository: kafka
Updated Branches:
  refs/heads/0.11.0 557001f9f -> 0bf34e715


KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual 
BrokerNotAvailableExceptions

The AdminClient does not properly clear calls from the callsInFlight structure.
Later, in an effort to clear the lingering call objects, it closes the 
connection
they are associated with. This disrupts new incoming calls, which then get
BrokerNotAvailableException.

This patch fixes this bug by properly removing completed calls from the
callsInFlight structure. It also adds the Call#aborted flag, which
ensures that we throw the right exception (TimeoutException instead of
DisconnectException) and only abort a connection once -- even if there
is a similar bug in the future which causes old Call objects to linger.

Author: Colin P. Mccabe <cmcc...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3584 from cmccabe/KAFKA-5658

(cherry picked from commit c9ffab16228ecd5d931b58d93dfa3f49287d2909)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bf34e71
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bf34e71
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bf34e71

Branch: refs/heads/0.11.0
Commit: 0bf34e7157319bdd12ae907788bc40c70f9c7ad0
Parents: 557001f
Author: Colin P. Mccabe <cmcc...@confluent.io>
Authored: Tue Aug 8 09:38:15 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Tue Aug 8 09:40:17 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |  2 +-
 .../kafka/clients/admin/KafkaAdminClient.java   | 36 +++++++++----
 .../clients/admin/KafkaAdminClientTest.java     | 57 ++++++++++++++++++++
 .../clients/admin/MockKafkaAdminClientEnv.java  | 18 +++++--
 4 files changed, 99 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7f98820..0a1616a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -61,7 +61,7 @@
               
files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
-              
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>
+              
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/>
 
     <suppress checks="JavaNCSS"
               files="RequestResponseTest.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2129059..ed32bff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -322,9 +322,8 @@ public class KafkaAdminClient extends AdminClient {
         }
     }
 
-    static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Metadata metadata) {
+    static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Metadata metadata, Time time) {
         Metrics metrics = null;
-        Time time = Time.SYSTEM;
         String clientId = generateClientId(config);
 
         try {
@@ -441,6 +440,7 @@ public class KafkaAdminClient extends AdminClient {
         private final long deadlineMs;
         private final NodeProvider nodeProvider;
         private int tries = 0;
+        private boolean aborted = false;
 
         Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
             this.callName = callName;
@@ -459,6 +459,14 @@ public class KafkaAdminClient extends AdminClient {
          * @param throwable     The failure exception.
          */
         final void fail(long now, Throwable throwable) {
+            if (aborted) {
+                if (log.isDebugEnabled()) {
+                    log.debug("{} aborted at {} after {} attempt(s)", this, 
now, tries,
+                        new Exception(prettyPrintException(throwable)));
+                }
+                handleFailure(new TimeoutException("Aborted due to timeout."));
+                return;
+            }
             // If this is an UnsupportedVersionException that we can retry, do 
so.
             if ((throwable instanceof UnsupportedVersionException) &&
                      
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
@@ -792,12 +800,17 @@ public class KafkaAdminClient extends AdminClient {
                 // only one we need to check the timeout for.
                 Call call = contexts.get(0);
                 if (processor.callHasExpired(call)) {
-                    log.debug("{}: Closing connection to {} to time out {}", 
clientId, nodeId, call);
-                    client.disconnect(nodeId);
-                    numTimedOut++;
-                    // We don't remove anything from the callsInFlight data 
structure.  Because the connection
-                    // has been closed, the calls should be returned by the 
next client#poll(),
-                    // and handled at that point.
+                    if (call.aborted) {
+                        log.warn("{}: aborted call {} is still in 
callsInFlight.", clientId, call);
+                    } else {
+                        log.debug("{}: Closing connection to {} to time out 
{}", clientId, nodeId, call);
+                        call.aborted = true;
+                        client.disconnect(nodeId);
+                        numTimedOut++;
+                        // We don't remove anything from the callsInFlight 
data structure.  Because the connection
+                        // has been closed, the calls should be returned by 
the next client#poll(),
+                        // and handled at that point.
+                    }
                 }
             }
             if (numTimedOut > 0)
@@ -830,7 +843,12 @@ public class KafkaAdminClient extends AdminClient {
 
                 // Stop tracking this call.
                 correlationIdToCall.remove(correlationId);
-                getOrCreateListValue(callsInFlight, 
response.requestHeader().clientId()).remove(call);
+                List<Call> calls = callsInFlight.get(response.destination());
+                if ((calls == null) || (!calls.remove(call))) {
+                    log.error("Internal server error on {}: ignoring call {} 
in correlationIdToCall " +
+                        "that did not exist in callsInFlight", 
response.destination(), call);
+                    continue;
+                }
 
                 // Handle the result of the call.  This may involve retrying 
the call, if we got a
                 // retryible exception.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8300e0f..d0adb6a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
@@ -39,9 +40,13 @@ import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -59,6 +64,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -300,6 +306,57 @@ public class KafkaAdminClientTest {
         }
     }
 
+    /**
+     * Test handling timeouts.
+     */
+    @Test
+    public void testHandleTimeout() throws Exception {
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        MockTime time = new MockTime();
+        nodes.put(0, new Node(0, "localhost", 8121));
+        Cluster cluster = new Cluster("mockClusterId", nodes.values(),
+            Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
+            Collections.<String>emptySet(), nodes.get(0));
+        try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(time, 
cluster,
+            AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
+                AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(nodes.get(0));
+
+            // Make a request with an extremely short timeout.
+            // Then wait for it to fail by not supplying any response.
+            log.info("Starting AdminClient#listTopics...");
+            final ListTopicsResult result = env.adminClient().listTopics(new 
ListTopicsOptions().timeoutMs(1000));
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return env.kafkaClient().hasInFlightRequests();
+                }
+            }, "Timed out waiting for inFlightRequests");
+            time.sleep(5000);
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return result.listings().isDone();
+                }
+            }, "Timed out waiting for listTopics to complete");
+            assertFutureError(result.listings(), TimeoutException.class);
+            log.info("Verified the error result of AdminClient#listTopics");
+
+            // The next request should succeed.
+            time.sleep(5000);
+            env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
+                Collections.singletonMap(new 
org.apache.kafka.common.requests.Resource(TOPIC, "foo"),
+                    new DescribeConfigsResponse.Config(ApiError.NONE,
+                        
Collections.<DescribeConfigsResponse.ConfigEntry>emptySet()))));
+            DescribeConfigsResult result2 = 
env.adminClient().describeConfigs(Collections.singleton(
+                new ConfigResource(ConfigResource.Type.TOPIC, "foo")));
+            time.sleep(5000);
+            result2.values().get(new ConfigResource(ConfigResource.Type.TOPIC, 
"foo")).get();
+        }
+    }
+
     private static <T> void assertCollectionIs(Collection<T> collection, T... 
elements) {
         for (T element : elements) {
             assertTrue("Did not find " + element, 
collection.contains(element));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
index 6c1fd17..6648467 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
@@ -36,6 +36,7 @@ import java.util.Map;
  * When finished, be sure to {@link #close() close} the environment object.
  */
 public class MockKafkaAdminClientEnv implements AutoCloseable {
+    private final Time time;
     private final AdminClientConfig adminClientConfig;
     private final Metadata metadata;
     private final MockClient mockClient;
@@ -43,16 +44,25 @@ public class MockKafkaAdminClientEnv implements 
AutoCloseable {
     private final Cluster cluster;
 
     public MockKafkaAdminClientEnv(Cluster cluster, String...vals) {
-        this(cluster, newStrMap(vals));
+        this(Time.SYSTEM, cluster, vals);
     }
 
-    public MockKafkaAdminClientEnv(Cluster cluster, Map<String, Object> 
config) {
+    public MockKafkaAdminClientEnv(Time time, Cluster cluster, String...vals) {
+        this(time, cluster, newStrMap(vals));
+    }
+
+    public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map<String, 
Object> config) {
+        this.time = Time.SYSTEM;
         this.adminClientConfig = new AdminClientConfig(config);
         this.cluster = cluster;
         this.metadata = new 
Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
-        this.mockClient = new MockClient(Time.SYSTEM, this.metadata);
-        this.client = KafkaAdminClient.createInternal(adminClientConfig, 
mockClient, metadata);
+        this.mockClient = new MockClient(time, this.metadata);
+        this.client = KafkaAdminClient.createInternal(adminClientConfig, 
mockClient, metadata, time);
+    }
+
+    public Time time() {
+        return time;
     }
 
     public Cluster cluster() {

Reply via email to