This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 75b4d06043 KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
75b4d06043 is described below

commit 75b4d06043d1524f53ecd8a6c8321c5c5c135f52
Author: Hao Li <[email protected]>
AuthorDate: Wed Apr 13 04:49:31 2022 -0700

    KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
    
    Reviewers: Bruno Cadonna <[email protected]>, David Jacot 
<[email protected]>
---
 .../org/apache/kafka/streams/processor/internals/StreamThread.java  | 6 +++---
 .../apache/kafka/streams/processor/internals/StreamThreadTest.java  | 5 ++---
 2 files changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7401e539c4..1685735fc7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -589,7 +589,7 @@ public class StreamThread extends Thread {
                 runOnce();
                 if (nextProbingRebalanceMs.get() < time.milliseconds()) {
                     log.info("Triggering the followup rebalance scheduled for 
{} ms.", nextProbingRebalanceMs.get());
-                    mainConsumer.enforceRebalance();
+                    mainConsumer.enforceRebalance("Scheduled probing 
rebalance");
                     nextProbingRebalanceMs.set(Long.MAX_VALUE);
                 }
             } catch (final TaskCorruptedException e) {
@@ -601,7 +601,7 @@ public class StreamThread extends Thread {
                     final boolean enforceRebalance = 
taskManager.handleCorruption(e.corruptedTasks());
                     if (enforceRebalance && eosEnabled) {
                         log.info("Active task(s) got corrupted. Triggering a 
rebalance.");
-                        mainConsumer.enforceRebalance();
+                        mainConsumer.enforceRebalance("Active tasks 
corrupted");
                     }
                 } catch (final TaskMigratedException taskMigrated) {
                     handleTaskMigrated(taskMigrated);
@@ -643,7 +643,7 @@ public class StreamThread extends Thread {
         if (assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()) {
             log.warn("Detected that shutdown was requested. " +
                     "All clients in this app will now begin to shutdown");
-            mainConsumer.enforceRebalance();
+            mainConsumer.enforceRebalance("Shutdown requested");
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c0cf9a20bd..cfbf1e5531 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -514,6 +514,7 @@ public class StreamThreadTest {
         final EasyMockConsumerClientSupplier mockClientSupplier = new 
EasyMockConsumerClientSupplier(mockConsumer);
 
         mockClientSupplier.setCluster(createCluster());
+        mockConsumer.enforceRebalance("Scheduled probing rebalance");
         EasyMock.replay(mockConsumer);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
@@ -535,8 +536,6 @@ public class StreamThreadTest {
             null
         );
 
-        mockConsumer.enforceRebalance();
-
         mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
 
         thread.start();
@@ -2422,7 +2421,7 @@ public class StreamThreadTest {
         expect(task2.id()).andReturn(taskId2).anyTimes();
         expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
 
-        consumer.enforceRebalance();
+        consumer.enforceRebalance("Active tasks corrupted");
         expectLastCall();
 
         EasyMock.replay(task1, task2, taskManager, consumer);

Reply via email to