This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2baad13cf1a KAFKA-19521: Fix consumption and leaving group when source 
topic is deleted (#20735)
2baad13cf1a is described below

commit 2baad13cf1a18c8dc55be3e081de0ba80fe14e2f
Author: Jinhe Zhang <[email protected]>
AuthorDate: Wed Oct 22 04:40:01 2025 -0400

    KAFKA-19521: Fix consumption and leaving group when source topic is deleted 
(#20735)
    
    Integration tests expecting `MissingSourceTopicException` were timing
    out and taking 5+ minutes because [PR
    #20284](https://github.com/apache/kafka/pull/20284) set the missing
    topics timeout to `max.poll.interval.ms` (default 300 seconds).
    
    This is inappropriate because:
    - When source topics don't exist, actual poll frequency is `poll.ms`
    (100ms), not `max.poll.interval.ms`
    - The exception should be raised much faster based on heartbeat
    frequency
    
    ## Solution
    
    Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout
    instead:
    - Ensures at least one heartbeat is sent before raising the exception
    - Falls back to `max.poll.interval.ms` for backward compatibility
    - Fixes slow integration tests
    
    ## Changes
    
    1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it
    on each heartbeat response
    2. Use `2 * heartbeatIntervalMs` as timeout in
    `StreamThread.handleMissingSourceTopicsWithTimeout()`
    3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both
    "classic" and "streams" protocols
    4. Update `StreamsRebalanceDataTest`,
    `StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test
    `StreamsRebalanceData` and its behaviour
    
    Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
     <[email protected]>
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../StreamsGroupHeartbeatRequestManager.java       |  1 +
 .../consumer/internals/StreamsRebalanceData.java   | 13 ++++++++
 .../StreamsGroupHeartbeatRequestManagerTest.java   | 29 +++++++++++++++++
 .../internals/StreamsRebalanceDataTest.java        | 37 ++++++++++++++++++++++
 ...HandlingSourceTopicDeletionIntegrationTest.java |  9 ++++--
 .../streams/processor/internals/StreamThread.java  |  8 +++--
 .../processor/internals/StreamThreadTest.java      | 26 +++++++++------
 7 files changed, 108 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index ceeeb6c1916..cb9a38d0ddc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -527,6 +527,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         
heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
         heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
         
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
+        
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
 
         if (data.partitionsByUserEndpoint() != null) {
             streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index 2fe7ae8ad35..c6fe1fd9215 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -30,6 +30,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -329,6 +330,8 @@ public class StreamsRebalanceData {
 
     private final 
AtomicReference<List<StreamsGroupHeartbeatResponseData.Status>> statuses = new 
AtomicReference<>(List.of());
 
+    private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1);
+
     public StreamsRebalanceData(final UUID processId,
                                 final Optional<HostInfo> endpoint,
                                 final Map<String, Subtopology> subtopologies,
@@ -395,4 +398,14 @@ public class StreamsRebalanceData {
         return statuses.get();
     }
 
+    /** Updated whenever a heartbeat response is received from the broker. */
+    public void setHeartbeatIntervalMs(final int heartbeatIntervalMs) {
+        this.heartbeatIntervalMs.set(heartbeatIntervalMs);
+    }
+
+    /** Returns the heartbeat interval in milliseconds, or -1 if not yet set. 
*/
+    public int heartbeatIntervalMs() {
+        return heartbeatIntervalMs.get();
+    }
+
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index f4a2726b9e5..9e4b8437144 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -1507,6 +1507,35 @@ class StreamsGroupHeartbeatRequestManagerTest {
         }
     }
 
+    @Test
+    public void testStreamsRebalanceDataHeartbeatIntervalMsUpdatedOnSuccess() {
+        try (
+                final MockedConstruction<HeartbeatRequestState> ignored = 
mockConstruction(
+                        HeartbeatRequestState.class,
+                        (mock, context) -> 
when(mock.canSendRequest(time.milliseconds())).thenReturn(true))
+        ) {
+            final StreamsGroupHeartbeatRequestManager heartbeatRequestManager 
= createStreamsGroupHeartbeatRequestManager();
+            
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+            when(membershipManager.groupId()).thenReturn(GROUP_ID);
+            when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+            when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+            
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+
+            // Initially, heartbeatIntervalMs should be -1
+            assertEquals(-1, streamsRebalanceData.heartbeatIntervalMs());
+
+            final NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+            assertEquals(1, result.unsentRequests.size());
+
+            final NetworkClientDelegate.UnsentRequest networkRequest = 
result.unsentRequests.get(0);
+            final ClientResponse response = buildClientResponse();
+            networkRequest.handler().onComplete(response);
+
+            // After successful response, heartbeatIntervalMs should be updated
+            assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS, 
streamsRebalanceData.heartbeatIntervalMs());
+        }
+    }
+
     private static ConsumerConfig config() {
         Properties prop = new Properties();
         prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
index 606ba0b7350..f2376640c01 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
@@ -437,4 +437,41 @@ public class StreamsRebalanceDataTest {
         assertTrue(streamsRebalanceData.statuses().isEmpty());
     }
 
+    @Test
+    public void 
streamsRebalanceDataShouldBeConstructedWithHeartbeatIntervalMsSetToMinusOne() {
+        final UUID processId = UUID.randomUUID();
+        final Optional<StreamsRebalanceData.HostInfo> endpoint = 
Optional.of(new
+                StreamsRebalanceData.HostInfo("localhost", 9090));
+        final Map<String, StreamsRebalanceData.Subtopology> subtopologies = 
Map.of();
+        final Map<String, String> clientTags = Map.of("clientTag1",
+                "clientTagValue1");
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+                processId,
+                endpoint,
+                subtopologies,
+                clientTags
+        );
+
+        assertEquals(-1, streamsRebalanceData.heartbeatIntervalMs());
+    }
+
+    @Test
+    public void streamsRebalanceDataShouldBeAbleToUpdateHeartbeatIntervalMs() {
+        final UUID processId = UUID.randomUUID();
+        final Optional<StreamsRebalanceData.HostInfo> endpoint = 
Optional.of(new
+                StreamsRebalanceData.HostInfo("localhost", 9090));
+        final Map<String, StreamsRebalanceData.Subtopology> subtopologies = 
Map.of();
+        final Map<String, String> clientTags = Map.of("clientTag1",
+                "clientTagValue1");
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+                processId,
+                endpoint,
+                subtopologies,
+                clientTags
+        );
+
+        streamsRebalanceData.setHeartbeatIntervalMs(1000);
+        assertEquals(1000, streamsRebalanceData.heartbeatIntervalMs());
+    }
+
 }
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
index d8f9061dfdb..f31e79c5399 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
@@ -33,8 +33,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -75,8 +76,9 @@ public class HandlingSourceTopicDeletionIntegrationTest {
         CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
     }
 
-    @Test
-    public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo 
testName) throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(strings = {"classic", "streams"})
+    public void shouldThrowErrorAfterSourceTopicDeleted(final String 
groupProtocol, final TestInfo testName) throws InterruptedException {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
@@ -91,6 +93,7 @@ public class HandlingSourceTopicDeletionIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
NUM_THREADS);
         streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+        streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol);
 
         final Topology topology = builder.build();
         final AtomicBoolean calledUncaughtExceptionHandler1 = new 
AtomicBoolean(false);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3521b31d8a3..f208567c32d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1587,11 +1587,15 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     }
 
     private void handleMissingSourceTopicsWithTimeout(final String 
missingTopicsDetail) {
+        // Use 2 * heartbeatIntervalMs as the timeout ensures at least one 
heartbeat is sent before raising the exception
+        final int heartbeatIntervalMs = 
streamsRebalanceData.get().heartbeatIntervalMs();
+        final long timeoutMs = 2L * heartbeatIntervalMs;
+
         // Start timeout tracking on first encounter with missing topics
         if (topicsReadyTimer == null) {
-            topicsReadyTimer = time.timer(maxPollTimeMs);
+            topicsReadyTimer = time.timer(timeoutMs);
             log.info("Missing source topics detected: {}. Will wait up to {}ms 
before failing.",
-                missingTopicsDetail, maxPollTimeMs);
+                missingTopicsDetail, timeoutMs);
         } else {
             topicsReadyTimer.update();
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d7f61883adb..66b90ffcc03 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -3966,11 +3966,13 @@ public class StreamThreadTest {
                         .setStatusDetail("Missing source topics")
         ));
 
+        streamsRebalanceData.setHeartbeatIntervalMs(5000);
+
         // First call should not throw exception (within timeout)
         thread.runOnceWithoutProcessingThreads();
 
         // Advance time beyond max.poll.interval.ms (default is 300000ms) to 
trigger timeout
-        mockTime.sleep(300001);
+        mockTime.sleep(10001);
 
         final MissingSourceTopicException exception = 
assertThrows(MissingSourceTopicException.class, () -> 
thread.runOnceWithoutProcessingThreads());
         assertTrue(exception.getMessage().contains("Missing source topics"));
@@ -4153,11 +4155,13 @@ public class StreamThreadTest {
                         .setStatusDetail("Missing source topics")
         ));
 
+        streamsRebalanceData.setHeartbeatIntervalMs(5000);
+
         // First call should not throw exception (within timeout)
         thread.runOnceWithProcessingThreads();
 
-        // Advance time beyond max.poll.interval.ms (default is 300000ms) to 
trigger timeout
-        mockTime.sleep(300001);
+        // Advance time beyond 2 * heartbeatIntervalMs (default is 5000ms) to 
trigger timeout
+        mockTime.sleep(10001);
 
         final MissingSourceTopicException exception = 
assertThrows(MissingSourceTopicException.class, () -> 
thread.runOnceWithProcessingThreads());
         assertTrue(exception.getMessage().contains("Missing source topics"));
@@ -4221,11 +4225,13 @@ public class StreamThreadTest {
                         .setStatusDetail("Missing source topics")
         ));
 
+        streamsRebalanceData.setHeartbeatIntervalMs(5000);
+
         // First call should not throw exception (within timeout)
         thread.runOnceWithoutProcessingThreads();
 
         // Advance time but not beyond timeout
-        mockTime.sleep(150000); // Half of max.poll.interval.ms
+        mockTime.sleep(5000); // Half of max.poll.interval.ms
 
         // Should still not throw exception
         thread.runOnceWithoutProcessingThreads();
@@ -4243,13 +4249,13 @@ public class StreamThreadTest {
                         .setStatusDetail("Different missing topics")
         ));
 
-        // Advance time by 250 seconds to test if timer was reset
-        // Total time from beginning: 150000 + 250000 = 400000ms (400s)
-        // If timer was NOT reset: elapsed time = 400s > 300s → should throw
-        // If timer WAS reset: elapsed time = 250s < 300s → should NOT throw
-        mockTime.sleep(250000); // Advance by 250 seconds
+        // Advance time by 6 seconds to test if timer was reset
+        // Total time from beginning: 5000 + 6000 = 11000ms (11s)
+        // If timer was NOT reset: elapsed time = 11s > 10s → should throw
+        // If timer WAS reset: elapsed time = 6s < 10s → should NOT throw
+        mockTime.sleep(6000); // Advance by 6 seconds
 
-        // Should not throw because timer was reset - only 250s elapsed from 
reset point
+        // Should not throw because timer was reset - only 6s elapsed from 
reset point
         assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
     }
 

Reply via email to