This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65efb981347 KAFKA-10199: Do not process when in PARTITIONS_REVOKED
(#14265)
65efb981347 is described below
commit 65efb981347d6f81fb2713cd27cdfdfa9d8781b9
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Sep 26 15:25:30 2023 +0200
KAFKA-10199: Do not process when in PARTITIONS_REVOKED (#14265)
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.
With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.
This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.
Reviewer: Lucas Brutschy <[email protected]>
---
.../streams/processor/internals/StreamThread.java | 8 +-
.../processor/internals/StreamThreadTest.java | 89 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6dd4d136cf4..572fa4e353b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -260,6 +260,12 @@ public class StreamThread extends Thread {
}
}
+ public boolean isStartingRunningOrPartitionAssigned() {
+ synchronized (stateLock) {
+ return state.equals(State.RUNNING) || state.equals(State.STARTING)
|| state.equals(State.PARTITIONS_ASSIGNED);
+ }
+ }
+
private final Time time;
private final Logger log;
private final String logPrefix;
@@ -788,7 +794,7 @@ public class StreamThread extends Thread {
long totalProcessLatency = 0L;
long totalPunctuateLatency = 0L;
if (state == State.RUNNING
- || (stateUpdaterEnabled && isRunning())) {
+ || (stateUpdaterEnabled &&
isStartingRunningOrPartitionAssigned())) {
/*
* Within an iteration, after processing up to N (N initialized as
1 upon start up) records for each applicable tasks, check the current time:
* 1. If it is time to punctuate, do it;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5830265753e..1784a5335c8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -542,6 +542,95 @@ public class StreamThreadTest {
verify(taskManager);
}
+ @Test
+ public void shouldNotProcessWhenPartitionRevoked() {
+ final Properties props = configProps(false);
+
+ final StreamsConfig config = new StreamsConfig(props);
+
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+ final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+ final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+ final StreamThread thread = buildStreamThread(mainConsumer,
taskManager, config, topologyMetadata);
+ thread.setState(State.STARTING);
+ thread.setState(State.PARTITIONS_REVOKED);
+ thread.runOnce();
+
+ Mockito.verify(taskManager, Mockito.never()).process(Mockito.anyInt(),
Mockito.any());
+ }
+
+ @Test
+ public void shouldProcessWhenRunning() {
+ final Properties props = configProps(false);
+
+ final StreamsConfig config = new StreamsConfig(props);
+
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+ final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+ final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+ final StreamThread thread = buildStreamThread(mainConsumer,
taskManager, config, topologyMetadata);
+ thread.updateThreadMetadata("admin");
+ thread.setState(State.STARTING);
+ thread.setState(State.PARTITIONS_ASSIGNED);
+ thread.setState(State.RUNNING);
+ thread.runOnce();
+
+ Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
+ }
+
+ @Test
+ public void shouldProcessWhenPartitionAssigned() {
+ final Properties props = configProps(false);
+ props.setProperty(InternalConfig.STATE_UPDATER_ENABLED,
Boolean.toString(true));
+
+ final StreamsConfig config = new StreamsConfig(props);
+
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+ final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+ final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+ final StreamThread thread = buildStreamThread(mainConsumer,
taskManager, config, topologyMetadata);
+ thread.updateThreadMetadata("admin");
+ thread.setState(State.STARTING);
+ thread.setState(State.PARTITIONS_ASSIGNED);
+ thread.runOnce();
+
+ Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
+ }
+
+ @Test
+ public void shouldProcessWhenStarting() {
+ final Properties props = configProps(false);
+ props.setProperty(InternalConfig.STATE_UPDATER_ENABLED,
Boolean.toString(true));
+
+ final StreamsConfig config = new StreamsConfig(props);
+
when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty());
+ final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final TaskManager taskManager = Mockito.mock(TaskManager.class);
+
+ final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+ final StreamThread thread = buildStreamThread(mainConsumer,
taskManager, config, topologyMetadata);
+ thread.updateThreadMetadata("admin");
+ thread.setState(State.STARTING);
+ thread.runOnce();
+
+ Mockito.verify(taskManager).process(Mockito.anyInt(), Mockito.any());
+ }
+
@Test
public void
shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() throws
InterruptedException {
final StreamsConfig config = new StreamsConfig(configProps(false));