This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new aef800a  KAFKA-12462: proceed with task revocation in case of thread 
in PENDING_SHUTDOWN (#10311)
aef800a is described below

commit aef800af84ead23eda631d7f4ad49bf269a1f830
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Mar 12 20:06:54 2021 -0800

    KAFKA-12462: proceed with task revocation in case of thread in 
PENDING_SHUTDOWN (#10311)
    
    Always invoke TaskManager#handleRevocation when the thread is in 
PENDING_SHUTDOWN
    
    Reviewers: Walker Carlson <[email protected]>
---
 .../org/apache/kafka/streams/processor/internals/StreamThread.java    | 2 +-
 .../kafka/streams/processor/internals/StreamsRebalanceListener.java   | 4 +++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 12f1e26..1b668f2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -629,7 +629,7 @@ public class StreamThread extends Thread {
         // Should only proceed when the thread is still running after 
#pollRequests(), because no external state mutation
         // could affect the task manager state beyond this point within 
#runOnce().
         if (!isRunning()) {
-            log.debug("Thread state is already {}, skipping the run once call 
after poll request", state);
+            log.info("Thread state is already {}, skipping the run once call 
after poll request", state);
             return;
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index c23eab6..b314d8f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -80,7 +80,9 @@ public class StreamsRebalanceListener implements 
ConsumerRebalanceListener {
                   taskManager.activeTaskIds(),
                   taskManager.standbyTaskIds());
 
-        if (streamThread.setState(State.PARTITIONS_REVOKED) != null && 
!partitions.isEmpty()) {
+        // We need to still invoke handleRevocation if the thread has been 
told to shut down, but we shouldn't ever
+        // transition away from PENDING_SHUTDOWN once it's been initiated (to 
anything other than DEAD)
+        if ((streamThread.setState(State.PARTITIONS_REVOKED) != null || 
streamThread.state() == State.PENDING_SHUTDOWN) && !partitions.isEmpty()) {
             final long start = time.milliseconds();
             try {
                 taskManager.handleRevocation(partitions);

Reply via email to