This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 958f90e  KAFKA-12375: fix concurrency issue in application shutdown 
(#10213)
958f90e is described below

commit 958f90e710f0de164dce1dda5f45d75a8b8fb8d4
Author: Walker Carlson <[email protected]>
AuthorDate: Fri Feb 26 12:17:28 2021 -0800

    KAFKA-12375: fix concurrency issue in application shutdown (#10213)
    
    Need to ensure that `enforceRebalance` is used in a thread safe way
    
    Reviewers: Bruno Cadonna <[email protected]>, Anna Sophie Blee-Goldman 
<[email protected]>
---
 .../apache/kafka/streams/processor/internals/StreamThread.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b967f5af..9fadd53 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -571,6 +571,11 @@ public class StreamThread extends Thread {
         // until the rebalance is completed before we close and commit the 
tasks
         while (isRunning() || taskManager.isRebalanceInProgress()) {
             try {
+                if (assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()) {
+                    log.warn("Detected that shutdown was requested. " +
+                            "All clients in this app will now begin to 
shutdown");
+                    mainConsumer.enforceRebalance();
+                }
                 runOnce();
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for 
{} ms.", nextProbingRebalanceMs.get());
@@ -660,10 +665,7 @@ public class StreamThread extends Thread {
     }
 
     public void sendShutdownRequest(final AssignorError assignorError) {
-        log.warn("Detected that shutdown was requested. " +
-                "All clients in this app will now begin to shutdown");
         assignmentErrorCode.set(assignorError.code());
-        mainConsumer.enforceRebalance();
     }
 
     private void handleTaskMigrated(final TaskMigratedException e) {

Reply via email to