This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65efb981347 KAFKA-10199: Do not process when in PARTITIONS_REVOKED 
(#14265)
65efb981347 is described below

commit 65efb981347d6f81fb2713cd27cdfdfa9d8781b9
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Sep 26 15:25:30 2023 +0200

    KAFKA-10199: Do not process when in PARTITIONS_REVOKED (#14265)
    
    When a Streams application is subscribed with a pattern to
    input topics and an input topic is deleted, the stream thread
    transists to PARTITIONS_REVOKED and a rebalance is triggered.
    This happens inside the poll call. Sometimes, the poll call
    returns before a new assignment is received. That means, Streams
    executes the poll loop in state PARTITIONS_REVOKED.
    
    With the state updater enabled processing is also executed in states
    other than RUNNING and so processing is also executed when the
    stream thread is in state PARTITION_REVOKED. However, that triggers
    an IllegalStateException with error message:
    No current assignment for partition TEST-TOPIC-A-0
    which is a fatal error.
    
    This commit prevents processing when the stream thread is in state
    PARTITIONS_REVOKED.
    
    Reviewer: Lucas Brutschy <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  |  8 +-
 .../processor/internals/StreamThreadTest.java      | 89 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6dd4d136cf4..572fa4e353b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -260,6 +260,12 @@ public class StreamThread extends Thread {
         }
     }
 
+    public boolean isStartingRunningOrPartitionAssigned() {
+        synchronized (stateLock) {
+            return state.equals(State.RUNNING) || state.equals(State.STARTING) 
|| state.equals(State.PARTITIONS_ASSIGNED);
+        }
+    }
+
     private final Time time;
     private final Logger log;
     private final String logPrefix;
@@ -788,7 +794,7 @@ public class StreamThread extends Thread {
         long totalProcessLatency = 0L;
         long totalPunctuateLatency = 0L;
         if (state == State.RUNNING
-            || (stateUpdaterEnabled && isRunning())) {
+            || (stateUpdaterEnabled && 
isStartingRunningOrPartitionAssigned())) {
             /*
              * Within an iteration, after processing up to N (N initialized as 
1 upon start up) records for each applicable tasks, check the current time:
              *  1. If it is time to punctuate, do it;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5830265753e..1784a5335c8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -542,6 +542,95 @@ public class StreamThreadTest {
         verify(taskManager);
     }
 
+    @Test
+    public void shouldNotProcessWhenPartitionRevoked() {
+        final Properties props = configProps(false);
+
+        final StreamsConfig config = new StreamsConfig(props);
+        
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+        final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = buildStreamThread(mainConsumer, 
taskManager, config, topologyMetadata);
+        thread.setState(State.STARTING);
+        thread.setState(State.PARTITIONS_REVOKED);
+        thread.runOnce();
+
+        Mockito.verify(taskManager, Mockito.never()).process(Mockito.anyInt(), 
Mockito.any());
+    }
+
+    @Test
+    public void shouldProcessWhenRunning() {
+        final Properties props = configProps(false);
+
+        final StreamsConfig config = new StreamsConfig(props);
+        
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+        final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = buildStreamThread(mainConsumer, 
taskManager, config, topologyMetadata);
+        thread.updateThreadMetadata("admin");
+        thread.setState(State.STARTING);
+        thread.setState(State.PARTITIONS_ASSIGNED);
+        thread.setState(State.RUNNING);
+        thread.runOnce();
+
+        Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
+    }
+
+    @Test
+    public void shouldProcessWhenPartitionAssigned() {
+        final Properties props = configProps(false);
+        props.setProperty(InternalConfig.STATE_UPDATER_ENABLED, 
Boolean.toString(true));
+
+        final StreamsConfig config = new StreamsConfig(props);
+        
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+        final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = buildStreamThread(mainConsumer, 
taskManager, config, topologyMetadata);
+        thread.updateThreadMetadata("admin");
+        thread.setState(State.STARTING);
+        thread.setState(State.PARTITIONS_ASSIGNED);
+        thread.runOnce();
+
+        Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
+    }
+
+    @Test
+    public void shouldProcessWhenStarting() {
+        final Properties props = configProps(false);
+        props.setProperty(InternalConfig.STATE_UPDATER_ENABLED, 
Boolean.toString(true));
+
+        final StreamsConfig config = new StreamsConfig(props);
+        
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+        final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = buildStreamThread(mainConsumer, 
taskManager, config, topologyMetadata);
+        thread.updateThreadMetadata("admin");
+        thread.setState(State.STARTING);
+        thread.runOnce();
+
+        Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
+    }
+
     @Test
     public void 
shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() throws 
InterruptedException {
         final StreamsConfig config = new StreamsConfig(configProps(false));

Reply via email to