[ 
https://issues.apache.org/jira/browse/KAFKA-6593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382469#comment-16382469
 ] 

ASF GitHub Bot commented on KAFKA-6593:
---------------------------------------

hachikuji closed pull request #4625: KAFKA-6593; Fix livelock with consumer 
heartbeat thread in commitSync
URL: https://github.com/apache/kafka/pull/4625
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6884ff0dff7..2daadddee63 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -231,7 +231,7 @@ protected synchronized boolean ensureCoordinatorReady(long 
startTimeMs, long tim
             } else if (coordinator != null && 
client.connectionFailed(coordinator)) {
                 // we found the coordinator, but the connection has failed, so 
mark
                 // it dead and backoff before retrying discovery
-                coordinatorDead();
+                markCoordinatorUnknown();
                 time.sleep(retryBackoffMs);
             }
 
@@ -487,7 +487,7 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture<ByteBuffer> fut
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
-                coordinatorDead();
+                markCoordinatorUnknown();
                 log.debug("Attempt to join group failed due to obsolete 
coordinator information: {}", error.message());
                 future.raise(error);
             } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
@@ -550,7 +550,7 @@ public void handle(SyncGroupResponse syncResponse,
                 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                    log.debug("SyncGroup failed due to group rebalance");
+                    log.debug("SyncGroup failed because the group began 
another rebalance");
                     future.raise(error);
                 } else if (error == Errors.UNKNOWN_MEMBER_ID
                         || error == Errors.ILLEGAL_GENERATION) {
@@ -559,8 +559,8 @@ public void handle(SyncGroupResponse syncResponse,
                     future.raise(error);
                 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                         || error == Errors.NOT_COORDINATOR) {
-                    log.debug("SyncGroup failed:", error.message());
-                    coordinatorDead();
+                    log.debug("SyncGroup failed: {}", error.message());
+                    markCoordinatorUnknown();
                     future.raise(error);
                 } else {
                     future.raise(new KafkaException("Unexpected error from 
SyncGroup: " + error.message()));
@@ -627,27 +627,34 @@ public void onFailure(RuntimeException e, 
RequestFuture<Void> future) {
      * @return true if the coordinator is unknown
      */
     public boolean coordinatorUnknown() {
-        return coordinator() == null;
+        return checkAndGetCoordinator() == null;
     }
 
     /**
-     * Get the current coordinator
+     * Get the coordinator if its connection is still active. Otherwise mark 
it unknown and
+     * return null.
+     *
      * @return the current coordinator or null if it is unknown
      */
-    protected synchronized Node coordinator() {
+    protected synchronized Node checkAndGetCoordinator() {
         if (coordinator != null && client.connectionFailed(coordinator)) {
-            coordinatorDead();
+            markCoordinatorUnknown(true);
             return null;
         }
         return this.coordinator;
     }
 
-    /**
-     * Mark the current coordinator as dead.
-     */
-    protected synchronized void coordinatorDead() {
+    private synchronized Node coordinator() {
+        return this.coordinator;
+    }
+
+    protected synchronized void markCoordinatorUnknown() {
+        markCoordinatorUnknown(false);
+    }
+
+    protected synchronized void markCoordinatorUnknown(boolean isDisconnected) 
{
         if (this.coordinator != null) {
-            log.info("Marking the coordinator {} dead", this.coordinator);
+            log.info("Group coordinator {} is unavailable or invalid, will 
attempt rediscovery", this.coordinator);
             Node oldCoordinator = this.coordinator;
 
             // Mark the coordinator dead before disconnecting requests since 
the callbacks for any pending
@@ -656,8 +663,9 @@ protected synchronized void coordinatorDead() {
             this.coordinator = null;
 
             // Disconnect from the coordinator to ensure that there are no 
in-flight requests remaining.
-            // Pending callbacks will be invoked with a DisconnectException.
-            client.disconnect(oldCoordinator);
+            // Pending callbacks will be invoked with a DisconnectException on 
the next call to poll.
+            if (!isDisconnected)
+                client.disconnectAsync(oldCoordinator);
         }
     }
 
@@ -708,7 +716,7 @@ protected void close(long timeoutMs) {
                 // interrupted using wakeup) and the leave group request which 
have been queued, but not
                 // yet sent to the broker. Wait up to close timeout for these 
pending requests to be processed.
                 // If coordinator is not known, requests are aborted.
-                Node coordinator = coordinator();
+                Node coordinator = checkAndGetCoordinator();
                 if (coordinator != null && 
!client.awaitPendingRequests(coordinator, timeoutMs))
                     log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections",
                             client.pendingRequestCount(coordinator));
@@ -769,7 +777,7 @@ public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture<Void> futu
                     || error == Errors.NOT_COORDINATOR) {
                 log.debug("Attempt to heartbeat since coordinator {} is either 
not started or not valid.",
                         coordinator());
-                coordinatorDead();
+                markCoordinatorUnknown();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                 log.debug("Attempt to heartbeat failed since group is 
rebalancing");
@@ -800,7 +808,7 @@ public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture<Void> futu
         public void onFailure(RuntimeException e, RequestFuture<T> future) {
             // mark the coordinator as dead
             if (e instanceof DisconnectException) {
-                coordinatorDead();
+                markCoordinatorUnknown(true);
             }
             future.raise(e);
         }
@@ -948,7 +956,7 @@ public void run() {
                         } else if (heartbeat.sessionTimeoutExpired(now)) {
                             // the session timeout has expired without seeing 
a successful heartbeat, so we should
                             // probably make sure the coordinator is still 
healthy.
-                            coordinatorDead();
+                            markCoordinatorUnknown();
                         } else if (heartbeat.pollTimeoutExpired(now)) {
                             // the poll timeout has expired, which means that 
the foreground thread has stalled
                             // in between calls to poll(), so we explicitly 
leave the group.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 0aa61bbf442..2afa1ff9236 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -684,7 +684,7 @@ public void onComplete(Map<TopicPartition, 
OffsetAndMetadata> offsets, Exception
         if (offsets.isEmpty())
             return RequestFuture.voidSuccess();
 
-        Node coordinator = coordinator();
+        Node coordinator = checkAndGetCoordinator();
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
@@ -762,7 +762,7 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
                     } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                             || error == Errors.NOT_COORDINATOR
                             || error == Errors.REQUEST_TIMED_OUT) {
-                        coordinatorDead();
+                        markCoordinatorUnknown();
                         future.raise(error);
                         return;
                     } else if (error == Errors.UNKNOWN_MEMBER_ID
@@ -799,7 +799,7 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
      * @return A request future containing the committed offsets.
      */
     private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> 
sendOffsetFetchRequest(Set<TopicPartition> partitions) {
-        Node coordinator = coordinator();
+        Node coordinator = checkAndGetCoordinator();
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
@@ -825,7 +825,7 @@ public void handle(OffsetFetchResponse response, 
RequestFuture<Map<TopicPartitio
                     future.raise(error);
                 } else if (error == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
-                    coordinatorDead();
+                    markCoordinatorUnknown();
                     future.raise(error);
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     future.raise(new GroupAuthorizationException(groupId));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 0747e8db146..fb393a54c7e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -44,6 +44,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Higher level consumer access to the network layer with basic support for 
request futures. This class
@@ -65,10 +66,15 @@
     private final long unsentExpiryMs;
     private final AtomicBoolean wakeupDisabled = new AtomicBoolean();
 
+    // We do not need high throughput, so use a fair lock to try to avoid 
starvation
+    private final ReentrantLock lock = new ReentrantLock(true);
+
     // when requests complete, they are transferred to this queue prior to 
invocation. The purpose
     // is to avoid invoking them while holding this object's monitor which can 
open the door for deadlocks.
     private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> 
pendingCompletion = new ConcurrentLinkedQueue<>();
 
+    private final ConcurrentLinkedQueue<Node> pendingDisconnects = new 
ConcurrentLinkedQueue<>();
+
     // this flag allows the client to be safely woken up without waiting on 
the lock above. It is
     // atomic to avoid the need to acquire the lock above in order to enable 
it concurrently.
     private final AtomicBoolean wakeup = new AtomicBoolean(false);
@@ -113,12 +119,22 @@ public ConsumerNetworkClient(LogContext logContext,
         return completionHandler.future;
     }
 
-    public synchronized Node leastLoadedNode() {
-        return client.leastLoadedNode(time.milliseconds());
+    public Node leastLoadedNode() {
+        lock.lock();
+        try {
+            return client.leastLoadedNode(time.milliseconds());
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public synchronized boolean hasReadyNodes() {
-        return client.hasReadyNodes();
+    public boolean hasReadyNodes() {
+        lock.lock();
+        try {
+            return client.hasReadyNodes();
+        } finally {
+            lock.unlock();
+        }
     }
 
     /**
@@ -227,14 +243,18 @@ public void poll(long timeout, long now, PollCondition 
pollCondition, boolean di
         // there may be handlers which need to be invoked if we woke up the 
previous call to poll
         firePendingCompletedRequests();
 
-        synchronized (this) {
+        lock.lock();
+        try {
+            // Handle async disconnects prior to attempting any sends
+            handlePendingDisconnects();
+
             // send all the requests we can send now
             trySend(now);
 
             // check whether the poll is still needed by the caller. Note that 
if the expected completion
             // condition becomes satisfied after the call to shouldBlock() 
(because of a fired completion
             // handler), the client will be woken up.
-            if (pollCondition == null || pollCondition.shouldBlock()) {
+            if (pendingCompletion.isEmpty() && (pollCondition == null || 
pollCondition.shouldBlock())) {
                 // if there are no requests in flight, do not block longer 
than the retry backoff
                 if (client.inFlightRequestCount() == 0)
                     timeout = Math.min(timeout, retryBackoffMs);
@@ -265,6 +285,8 @@ public void poll(long timeout, long now, PollCondition 
pollCondition, boolean di
 
             // clean unsent requests collection to keep the map from growing 
indefinitely
             unsent.clean();
+        } finally {
+            lock.unlock();
         }
 
         // called without the lock to avoid deadlock potential if handlers 
need to acquire locks
@@ -303,8 +325,11 @@ public boolean awaitPendingRequests(Node node, long 
timeoutMs) {
      * @return The number of pending requests
      */
     public int pendingRequestCount(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             return unsent.requestCount(node) + 
client.inFlightRequestCount(node.idString());
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -317,8 +342,11 @@ public int pendingRequestCount(Node node) {
     public boolean hasPendingRequests(Node node) {
         if (unsent.hasRequests(node))
             return true;
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.hasInFlightRequests(node.idString());
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -328,8 +356,11 @@ public boolean hasPendingRequests(Node node) {
      * @return The total count of pending requests
      */
     public int pendingRequestCount() {
-        synchronized (this) {
+        lock.lock();
+        try {
             return unsent.requestCount() + client.inFlightRequestCount();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -341,8 +372,11 @@ public int pendingRequestCount() {
     public boolean hasPendingRequests() {
         if (unsent.hasRequests())
             return true;
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.hasInFlightRequests();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -386,15 +420,25 @@ private void checkDisconnects(long now) {
         }
     }
 
-    public void disconnect(Node node) {
-        failUnsentRequests(node, DisconnectException.INSTANCE);
+    private void handlePendingDisconnects() {
+        lock.lock();
+        try {
+            while (true) {
+                Node node = pendingDisconnects.poll();
+                if (node == null)
+                    break;
 
-        synchronized (this) {
-            client.disconnect(node.idString());
+                failUnsentRequests(node, DisconnectException.INSTANCE);
+                client.disconnect(node.idString());
+            }
+        } finally {
+            lock.unlock();
         }
+    }
 
-        // We need to poll to ensure callbacks from in-flight requests on the 
disconnected socket are fired
-        pollNoWakeup();
+    public void disconnectAsync(Node node) {
+        pendingDisconnects.offer(node);
+        client.wakeup();
     }
 
     private void failExpiredRequests(long now) {
@@ -408,16 +452,16 @@ private void failExpiredRequests(long now) {
 
     private void failUnsentRequests(Node node, RuntimeException e) {
         // clear unsent requests to node and fail their corresponding futures
-        synchronized (this) {
+        lock.lock();
+        try {
             Collection<ClientRequest> unsentRequests = unsent.remove(node);
             for (ClientRequest unsentRequest : unsentRequests) {
                 RequestFutureCompletionHandler handler = 
(RequestFutureCompletionHandler) unsentRequest.callback();
                 handler.onFailure(e);
             }
+        } finally {
+            lock.unlock();
         }
-
-        // called without the lock to avoid deadlock potential
-        firePendingCompletedRequests();
     }
 
     private boolean trySend(long now) {
@@ -458,8 +502,11 @@ public void disableWakeups() {
 
     @Override
     public void close() throws IOException {
-        synchronized (this) {
+        lock.lock();
+        try {
             client.close();
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -469,8 +516,11 @@ public void close() throws IOException {
      * @param node Node to connect to if possible
      */
     public boolean connectionFailed(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             return client.connectionFailed(node);
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -481,8 +531,11 @@ public boolean connectionFailed(Node node) {
      * @param node The node to connect to
      */
     public void tryConnect(Node node) {
-        synchronized (this) {
+        lock.lock();
+        try {
             client.ready(node, time.milliseconds());
+        } finally {
+            lock.unlock();
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index ed45e3af56d..faf9240537c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -86,6 +87,7 @@ public FutureResponse(Node node,
     private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
     private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
     private volatile NodeApiVersions nodeApiVersions = 
NodeApiVersions.create();
+    private volatile int numBlockingWakeups = 0;
 
     public MockClient(Time time) {
         this(time, null);
@@ -195,8 +197,40 @@ public void send(ClientRequest request, long now) {
         this.requests.add(request);
     }
 
+    /**
+     * Simulate a blocking poll in order to test wakeup behavior.
+     *
+     * @param numBlockingWakeups The number of polls which will block until 
woken up
+     */
+    public synchronized void enableBlockingUntilWakeup(int numBlockingWakeups) 
{
+        this.numBlockingWakeups = numBlockingWakeups;
+    }
+
+    @Override
+    public synchronized void wakeup() {
+        if (numBlockingWakeups > 0) {
+            numBlockingWakeups--;
+            notify();
+        }
+    }
+
+    private synchronized void maybeAwaitWakeup() {
+        try {
+            int remainingBlockingWakeups = numBlockingWakeups;
+            if (remainingBlockingWakeups <= 0)
+                return;
+
+            while (numBlockingWakeups == remainingBlockingWakeups)
+                wait();
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        }
+    }
+
     @Override
     public List<ClientResponse> poll(long timeoutMs, long now) {
+        maybeAwaitWakeup();
+
         List<ClientResponse> copy = new ArrayList<>(this.responses);
 
         if (metadata != null && metadata.updateRequested()) {
@@ -427,10 +461,6 @@ public ClientRequest newClientRequest(String nodeId, 
AbstractRequest.Builder<?>
                 expectResponse, callback);
     }
 
-    @Override
-    public void wakeup() {
-    }
-
     @Override
     public void close() {
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2e5c96019b9..ac392055f9b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -221,7 +221,8 @@ public void onComplete(Map<TopicPartition, 
OffsetAndMetadata> offsets, Exception
             });
         }
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
+        consumerClient.pollNoWakeup();
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(numRequests, responses.get());
     }
@@ -238,7 +239,7 @@ public void 
testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() throws
         final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
         Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = 
Collections.singletonMap(
                 new TopicPartition("foo", 0), new 
OffsetCommitRequest.PartitionData(13L, ""));
-        consumerClient.send(coordinator.coordinator(), new 
OffsetCommitRequest.Builder(groupId, offsets))
+        consumerClient.send(coordinator.checkAndGetCoordinator(), new 
OffsetCommitRequest.Builder(groupId, offsets))
                 .compose(new RequestFutureAdapter<ClientResponse, Object>() {
                     @Override
                     public void onSuccess(ClientResponse value, 
RequestFuture<Object> future) {}
@@ -251,7 +252,8 @@ public void onFailure(RuntimeException e, 
RequestFuture<Object> future) {
                     }
                 });
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
+        consumerClient.pollNoWakeup();
         assertTrue(asyncCallbackInvoked.get());
     }
 
@@ -1033,6 +1035,7 @@ private void 
testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error)
 
         client.respond(offsetCommitResponse(Collections.singletonMap(t1p, 
error)));
         consumerClient.pollNoWakeup();
+        consumerClient.pollNoWakeup(); // second poll since coordinator 
disconnect is async
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -1678,7 +1681,7 @@ public void testAutoCommitAfterCoordinatorBackToService() 
{
         subscriptions.assignFromUser(Collections.singleton(t1p));
         subscriptions.seek(t1p, 100L);
 
-        coordinator.coordinatorDead();
+        coordinator.markCoordinatorUnknown();
         assertTrue(coordinator.coordinatorUnknown());
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, 
Errors.NONE)));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 904270ec489..d0888fa5655 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -107,7 +107,8 @@ public void testDisconnectWithUnsentRequests() {
         RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
         assertTrue(consumerClient.hasPendingRequests(node));
         assertFalse(client.hasInFlightRequests(node.idString()));
-        consumerClient.disconnect(node);
+        consumerClient.disconnectAsync(node);
+        consumerClient.pollNoWakeup();
         assertTrue(future.failed());
         assertTrue(future.exception() instanceof DisconnectException);
     }
@@ -118,7 +119,8 @@ public void testDisconnectWithInFlightRequests() {
         consumerClient.pollNoWakeup();
         assertTrue(consumerClient.hasPendingRequests(node));
         assertTrue(client.hasInFlightRequests(node.idString()));
-        consumerClient.disconnect(node);
+        consumerClient.disconnectAsync(node);
+        consumerClient.pollNoWakeup();
         assertTrue(future.failed());
         assertTrue(future.exception() instanceof DisconnectException);
     }
@@ -205,6 +207,66 @@ public void wakeup() {
         assertTrue(future.isDone());
     }
 
+    @Test
+    public void testDisconnectWakesUpPoll() throws Exception {
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
+
+        client.enableBlockingUntilWakeup(1);
+        Thread t = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.poll(future);
+            }
+        };
+        t.start();
+
+        consumerClient.disconnectAsync(node);
+        t.join();
+        assertTrue(future.failed());
+        assertTrue(future.exception() instanceof DisconnectException);
+    }
+
+    @Test
+    public void testFutureCompletionOutsidePoll() throws Exception {
+        // Tests the scenario in which the request that is being awaited in 
one thread
+        // is received and completed in another thread.
+
+        final RequestFuture<ClientResponse> future = consumerClient.send(node, 
heartbeat());
+        consumerClient.pollNoWakeup(); // dequeue and send the request
+
+        client.enableBlockingUntilWakeup(2);
+        Thread t1 = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.pollNoWakeup();
+            }
+        };
+        t1.start();
+
+        // Sleep a little so that t1 is blocking in poll
+        Thread.sleep(50);
+
+        Thread t2 = new Thread() {
+            @Override
+            public void run() {
+                consumerClient.poll(future);
+            }
+        };
+        t2.start();
+
+        // Sleep a little so that t2 is awaiting the network client lock
+        Thread.sleep(50);
+
+        // Simulate a network response and return from the poll in t1
+        client.respond(heartbeatResponse(Errors.NONE));
+        client.wakeup();
+
+        // Both threads should complete since t1 should wakeup t2
+        t1.join();
+        t2.join();
+        assertTrue(future.succeeded());
+    }
+
     @Test
     public void testAwaitForMetadataUpdateWithTimeout() {
         assertFalse(consumerClient.awaitMetadataUpdate(10L));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Coordinator disconnect in heartbeat thread can cause commitSync to block 
> indefinitely
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6593
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6593
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 1.0.0, 0.11.0.2
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Critical
>             Fix For: 1.1.0
>
>         Attachments: consumer.log
>
>
> If a coordinator disconnect is observed in the heartbeat thread, it can cause 
> a pending offset commit to be cancelled just before the foreground thread 
> begins waiting on its response in poll(). Since the poll timeout is 
> Long.MAX_VALUE, this will cause the consumer to effectively hang until some 
> other network event causes the poll() to return. We try to protect this case 
> with a poll condition on the future, but this isn't bulletproof since the 
> future can be completed outside of the lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to