This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2baad13cf1a KAFKA-19521: Fix consumption and leaving group when source
topic is deleted (#20735)
2baad13cf1a is described below
commit 2baad13cf1a18c8dc55be3e081de0ba80fe14e2f
Author: Jinhe Zhang <[email protected]>
AuthorDate: Wed Oct 22 04:40:01 2025 -0400
KAFKA-19521: Fix consumption and leaving group when source topic is deleted
(#20735)
Integration tests expecting `MissingSourceTopicException` were timing
out and taking 5+ minutes because [PR
#20284](https://github.com/apache/kafka/pull/20284) set the missing
topics timeout to `max.poll.interval.ms` (default 300 seconds).
This is inappropriate because:
- When source topics don't exist, actual poll frequency is `poll.ms`
(100ms), not `max.poll.interval.ms`
- The exception should be raised much faster based on heartbeat
frequency
## Solution
Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout
instead:
- Ensures at least one heartbeat is sent before raising the exception
- Falls back to `max.poll.interval.ms` for backward compatibility
- Fixes slow integration tests
## Changes
1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it
on each heartbeat response
2. Use `2 * heartbeatIntervalMs` as timeout in
`StreamThread.handleMissingSourceTopicsWithTimeout()`
3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both
"classic" and "streams" protocols
4. Update `StreamsRebalanceDataTest`,
`StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test
`StreamsRebalanceData` and its behaviour
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 1 +
.../consumer/internals/StreamsRebalanceData.java | 13 ++++++++
.../StreamsGroupHeartbeatRequestManagerTest.java | 29 +++++++++++++++++
.../internals/StreamsRebalanceDataTest.java | 37 ++++++++++++++++++++++
...HandlingSourceTopicDeletionIntegrationTest.java | 9 ++++--
.../streams/processor/internals/StreamThread.java | 8 +++--
.../processor/internals/StreamThreadTest.java | 26 +++++++++------
7 files changed, 108 insertions(+), 15 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index ceeeb6c1916..cb9a38d0ddc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -527,6 +527,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
+
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
if (data.partitionsByUserEndpoint() != null) {
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index 2fe7ae8ad35..c6fe1fd9215 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -30,6 +30,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -329,6 +330,8 @@ public class StreamsRebalanceData {
private final
AtomicReference<List<StreamsGroupHeartbeatResponseData.Status>> statuses = new
AtomicReference<>(List.of());
+ private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1);
+
public StreamsRebalanceData(final UUID processId,
final Optional<HostInfo> endpoint,
final Map<String, Subtopology> subtopologies,
@@ -395,4 +398,14 @@ public class StreamsRebalanceData {
return statuses.get();
}
+ /** Updated whenever a heartbeat response is received from the broker. */
+ public void setHeartbeatIntervalMs(final int heartbeatIntervalMs) {
+ this.heartbeatIntervalMs.set(heartbeatIntervalMs);
+ }
+
+ /** Returns the heartbeat interval in milliseconds, or -1 if not yet set.
*/
+ public int heartbeatIntervalMs() {
+ return heartbeatIntervalMs.get();
+ }
+
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index f4a2726b9e5..9e4b8437144 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -1507,6 +1507,35 @@ class StreamsGroupHeartbeatRequestManagerTest {
}
}
+ @Test
+ public void testStreamsRebalanceDataHeartbeatIntervalMsUpdatedOnSuccess() {
+ try (
+ final MockedConstruction<HeartbeatRequestState> ignored =
mockConstruction(
+ HeartbeatRequestState.class,
+ (mock, context) ->
when(mock.canSendRequest(time.milliseconds())).thenReturn(true))
+ ) {
+ final StreamsGroupHeartbeatRequestManager heartbeatRequestManager
= createStreamsGroupHeartbeatRequestManager();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+ when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+ // Initially, heartbeatIntervalMs should be -1
+ assertEquals(-1, streamsRebalanceData.heartbeatIntervalMs());
+
+ final NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ final NetworkClientDelegate.UnsentRequest networkRequest =
result.unsentRequests.get(0);
+ final ClientResponse response = buildClientResponse();
+ networkRequest.handler().onComplete(response);
+
+ // After successful response, heartbeatIntervalMs should be updated
+ assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS,
streamsRebalanceData.heartbeatIntervalMs());
+ }
+ }
+
private static ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
index 606ba0b7350..f2376640c01 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
@@ -437,4 +437,41 @@ public class StreamsRebalanceDataTest {
assertTrue(streamsRebalanceData.statuses().isEmpty());
}
+ @Test
+ public void
streamsRebalanceDataShouldBeConstructedWithHeartbeatIntervalMsSetToMinusOne() {
+ final UUID processId = UUID.randomUUID();
+ final Optional<StreamsRebalanceData.HostInfo> endpoint =
Optional.of(new
+ StreamsRebalanceData.HostInfo("localhost", 9090));
+ final Map<String, StreamsRebalanceData.Subtopology> subtopologies =
Map.of();
+ final Map<String, String> clientTags = Map.of("clientTag1",
+ "clientTagValue1");
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ processId,
+ endpoint,
+ subtopologies,
+ clientTags
+ );
+
+ assertEquals(-1, streamsRebalanceData.heartbeatIntervalMs());
+ }
+
+ @Test
+ public void streamsRebalanceDataShouldBeAbleToUpdateHeartbeatIntervalMs() {
+ final UUID processId = UUID.randomUUID();
+ final Optional<StreamsRebalanceData.HostInfo> endpoint =
Optional.of(new
+ StreamsRebalanceData.HostInfo("localhost", 9090));
+ final Map<String, StreamsRebalanceData.Subtopology> subtopologies =
Map.of();
+ final Map<String, String> clientTags = Map.of("clientTag1",
+ "clientTagValue1");
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ processId,
+ endpoint,
+ subtopologies,
+ clientTags
+ );
+
+ streamsRebalanceData.setHeartbeatIntervalMs(1000);
+ assertEquals(1000, streamsRebalanceData.heartbeatIntervalMs());
+ }
+
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
index d8f9061dfdb..f31e79c5399 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
@@ -33,8 +33,9 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Properties;
@@ -75,8 +76,9 @@ public class HandlingSourceTopicDeletionIntegrationTest {
CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
}
- @Test
- public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo
testName) throws InterruptedException {
+ @ParameterizedTest
+ @ValueSource(strings = {"classic", "streams"})
+ public void shouldThrowErrorAfterSourceTopicDeleted(final String
groupProtocol, final TestInfo testName) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(),
Serdes.String()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(),
Serdes.String()));
@@ -91,6 +93,7 @@ public class HandlingSourceTopicDeletionIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
NUM_THREADS);
streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+ streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol);
final Topology topology = builder.build();
final AtomicBoolean calledUncaughtExceptionHandler1 = new
AtomicBoolean(false);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3521b31d8a3..f208567c32d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1587,11 +1587,15 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
private void handleMissingSourceTopicsWithTimeout(final String
missingTopicsDetail) {
+ // Use 2 * heartbeatIntervalMs as the timeout ensures at least one
heartbeat is sent before raising the exception
+ final int heartbeatIntervalMs =
streamsRebalanceData.get().heartbeatIntervalMs();
+ final long timeoutMs = 2L * heartbeatIntervalMs;
+
// Start timeout tracking on first encounter with missing topics
if (topicsReadyTimer == null) {
- topicsReadyTimer = time.timer(maxPollTimeMs);
+ topicsReadyTimer = time.timer(timeoutMs);
log.info("Missing source topics detected: {}. Will wait up to {}ms
before failing.",
- missingTopicsDetail, maxPollTimeMs);
+ missingTopicsDetail, timeoutMs);
} else {
topicsReadyTimer.update();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d7f61883adb..66b90ffcc03 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -3966,11 +3966,13 @@ public class StreamThreadTest {
.setStatusDetail("Missing source topics")
));
+ streamsRebalanceData.setHeartbeatIntervalMs(5000);
+
// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();
// Advance time beyond max.poll.interval.ms (default is 300000ms) to
trigger timeout
- mockTime.sleep(300001);
+ mockTime.sleep(10001);
final MissingSourceTopicException exception =
assertThrows(MissingSourceTopicException.class, () ->
thread.runOnceWithoutProcessingThreads());
assertTrue(exception.getMessage().contains("Missing source topics"));
@@ -4153,11 +4155,13 @@ public class StreamThreadTest {
.setStatusDetail("Missing source topics")
));
+ streamsRebalanceData.setHeartbeatIntervalMs(5000);
+
// First call should not throw exception (within timeout)
thread.runOnceWithProcessingThreads();
- // Advance time beyond max.poll.interval.ms (default is 300000ms) to
trigger timeout
- mockTime.sleep(300001);
+ // Advance time beyond 2 * heartbeatIntervalMs (default is 5000ms) to
trigger timeout
+ mockTime.sleep(10001);
final MissingSourceTopicException exception =
assertThrows(MissingSourceTopicException.class, () ->
thread.runOnceWithProcessingThreads());
assertTrue(exception.getMessage().contains("Missing source topics"));
@@ -4221,11 +4225,13 @@ public class StreamThreadTest {
.setStatusDetail("Missing source topics")
));
+ streamsRebalanceData.setHeartbeatIntervalMs(5000);
+
// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();
// Advance time but not beyond timeout
- mockTime.sleep(150000); // Half of max.poll.interval.ms
+ mockTime.sleep(5000); // Half of max.poll.interval.ms
// Should still not throw exception
thread.runOnceWithoutProcessingThreads();
@@ -4243,13 +4249,13 @@ public class StreamThreadTest {
.setStatusDetail("Different missing topics")
));
- // Advance time by 250 seconds to test if timer was reset
- // Total time from beginning: 150000 + 250000 = 400000ms (400s)
- // If timer was NOT reset: elapsed time = 400s > 300s → should throw
- // If timer WAS reset: elapsed time = 250s < 300s → should NOT throw
- mockTime.sleep(250000); // Advance by 250 seconds
+ // Advance time by 6 seconds to test if timer was reset
+ // Total time from beginning: 5000 + 6000 = 11000ms (11s)
+ // If timer was NOT reset: elapsed time = 11s > 10s → should throw
+ // If timer WAS reset: elapsed time = 6s < 10s → should NOT throw
+ mockTime.sleep(6000); // Advance by 6 seconds
- // Should not throw because timer was reset - only 250s elapsed from
reset point
+ // Should not throw because timer was reset - only 6s elapsed from
reset point
assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
}