This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7f71697728 KAFKA-19978: Don't enter running state as long as group is 
not ready (#21110)
c7f71697728 is described below

commit c7f7169772873cc36cc344af9b0d96034ff9c774
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Dec 11 01:22:18 2025 +0100

    KAFKA-19978: Don't enter running state as long as group is not ready 
(#21110)
    
    Since internal topics are created asynchronous, and we do not want to
    have heartbeats failing, when an application has not yet created its
    internal topics, it remains in NOT_READY, until the internal topics are
    created.
    
    The way the client-side is implemented right now, the Kafka Streams
    application will still  enter RUNNING state with an empty assignment,
    when internal topics are missing. From the outside, it will therefore
    look like the initial set-up of the application has finished, while the
    application is still waiting for the group to become ready.
    
    This propagates the state of the group as part of the assignment, so
    that  we delay entering a "RUNNING" state until the group has become
    ready.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../internals/StreamsMembershipManager.java        |  84 +++++--
 .../consumer/internals/StreamsRebalanceData.java   |  19 +-
 .../requests/StreamsGroupHeartbeatResponse.java    |  22 +-
 .../message/StreamsGroupHeartbeatResponse.json     |   1 +
 .../StreamsGroupHeartbeatRequestManagerTest.java   |   9 +-
 .../internals/StreamsMembershipManagerTest.java    | 271 ++++++++++++++++++++-
 .../internals/StreamsRebalanceDataTest.java        |  35 ++-
 .../StreamsRebalanceListenerInvokerTest.java       |  64 ++---
 .../kafka/api/AuthorizerIntegrationTest.scala      |   6 +
 .../coordinator/group/GroupMetadataManager.java    |   7 +
 .../group/GroupMetadataManagerTest.java            |   6 +-
 .../KafkaStreamsTelemetryIntegrationTest.java      |   5 +
 .../integration/MetricsIntegrationTest.java        |  32 ++-
 .../integration/RocksDBMetricsIntegrationTest.java |  13 +-
 .../internals/DefaultStreamsRebalanceListener.java |   1 +
 .../streams/processor/internals/StreamThread.java  |  32 ++-
 .../DefaultStreamsRebalanceListenerTest.java       |  14 +-
 17 files changed, 527 insertions(+), 94 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index 84ac83125be..fcb0b8ad6c6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -80,22 +80,26 @@ public class StreamsMembershipManager implements 
RequestManager {
             NONE_EPOCH,
             Collections.emptyMap(),
             Collections.emptyMap(),
-            Collections.emptyMap()
+            Collections.emptyMap(),
+            false
         );
 
         public final long localEpoch;
         public final Map<String, SortedSet<Integer>> activeTasks;
         public final Map<String, SortedSet<Integer>> standbyTasks;
         public final Map<String, SortedSet<Integer>> warmupTasks;
+        public final boolean isGroupReady;
 
         public LocalAssignment(final long localEpoch,
                                final Map<String, SortedSet<Integer>> 
activeTasks,
                                final Map<String, SortedSet<Integer>> 
standbyTasks,
-                               final Map<String, SortedSet<Integer>> 
warmupTasks) {
+                               final Map<String, SortedSet<Integer>> 
warmupTasks,
+                               final boolean isGroupReady) {
             this.localEpoch = localEpoch;
             this.activeTasks = activeTasks;
             this.standbyTasks = standbyTasks;
             this.warmupTasks = warmupTasks;
+            this.isGroupReady = isGroupReady;
             if (localEpoch == NONE_EPOCH &&
                     (!activeTasks.isEmpty() || !standbyTasks.isEmpty() || 
!warmupTasks.isEmpty())) {
                 throw new IllegalArgumentException("Local epoch must be set if 
tasks are assigned.");
@@ -104,16 +108,19 @@ public class StreamsMembershipManager implements 
RequestManager {
 
         Optional<LocalAssignment> updateWith(final Map<String, 
SortedSet<Integer>> activeTasks,
                                              final Map<String, 
SortedSet<Integer>> standbyTasks,
-                                             final Map<String, 
SortedSet<Integer>> warmupTasks) {
+                                             final Map<String, 
SortedSet<Integer>> warmupTasks,
+                                             final boolean isGroupReady) {
             if (localEpoch != NONE_EPOCH &&
                     activeTasks.equals(this.activeTasks) &&
                     standbyTasks.equals(this.standbyTasks) &&
-                    warmupTasks.equals(this.warmupTasks)) {
+                    warmupTasks.equals(this.warmupTasks) &&
+                    isGroupReady == this.isGroupReady
+            ) {
                 return Optional.empty();
             }
 
             long nextLocalEpoch = localEpoch + 1;
-            return Optional.of(new LocalAssignment(nextLocalEpoch, 
activeTasks, standbyTasks, warmupTasks));
+            return Optional.of(new LocalAssignment(nextLocalEpoch, 
activeTasks, standbyTasks, warmupTasks, isGroupReady));
         }
 
         @Override
@@ -123,6 +130,7 @@ public class StreamsMembershipManager implements 
RequestManager {
                 ", activeTasks=" + activeTasks +
                 ", standbyTasks=" + standbyTasks +
                 ", warmupTasks=" + warmupTasks +
+                ", isGroupReady=" + isGroupReady +
                 '}';
         }
 
@@ -134,12 +142,13 @@ public class StreamsMembershipManager implements 
RequestManager {
             return localEpoch == that.localEpoch &&
                 Objects.equals(activeTasks, that.activeTasks) &&
                 Objects.equals(standbyTasks, that.standbyTasks) &&
-                Objects.equals(warmupTasks, that.warmupTasks);
+                Objects.equals(warmupTasks, that.warmupTasks) &&
+                isGroupReady == that.isGroupReady;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(localEpoch, activeTasks, standbyTasks, 
warmupTasks);
+            return Objects.hash(localEpoch, activeTasks, standbyTasks, 
warmupTasks, isGroupReady);
         }
     }
 
@@ -686,9 +695,9 @@ public class StreamsMembershipManager implements 
RequestManager {
         final List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks = 
responseData.activeTasks();
         final List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks = 
responseData.standbyTasks();
         final List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks = 
responseData.warmupTasks();
+        final boolean isGroupReady = isGroupReady(responseData.status());
 
         if (activeTasks != null && standbyTasks != null && warmupTasks != 
null) {
-
             if (!state.canHandleNewAssignment()) {
                 log.debug("Ignoring new assignment: active tasks {}, standby 
tasks {}, and warm-up tasks {} received " +
                         "from server because member is in {} state.",
@@ -699,17 +708,39 @@ public class StreamsMembershipManager implements 
RequestManager {
             processAssignmentReceived(
                 toTasksAssignment(activeTasks),
                 toTasksAssignment(standbyTasks),
-                toTasksAssignment(warmupTasks)
+                toTasksAssignment(warmupTasks),
+                isGroupReady
             );
-        } else {
-            if (responseData.activeTasks() != null ||
-                responseData.standbyTasks() != null ||
-                responseData.warmupTasks() != null) {
+        } else if (responseData.activeTasks() != null || 
responseData.standbyTasks() != null || responseData.warmupTasks() != null) {
+            throw new IllegalStateException("Invalid response data, task 
collections must be all null or all non-null: "
+                + responseData);
+        } else if (isGroupReady != targetAssignment.isGroupReady) {
+            // If the client did not provide a new assignment, but the group 
is now ready or not ready anymore, so
+            // update the target assignment and reconcile it.
+            processAssignmentReceived(
+                targetAssignment.activeTasks,
+                targetAssignment.standbyTasks,
+                targetAssignment.warmupTasks,
+                isGroupReady
+            );
+        }
+    }
 
-                throw new IllegalStateException("Invalid response data, task 
collections must be all null or all non-null: "
-                    + responseData);
+    private boolean 
isGroupReady(List<StreamsGroupHeartbeatResponseData.Status> statuses) {
+        if (statuses != null) {
+            for (final StreamsGroupHeartbeatResponseData.Status status : 
statuses) {
+                switch 
(StreamsGroupHeartbeatResponse.Status.fromCode(status.statusCode())) {
+                    case MISSING_SOURCE_TOPICS:
+                    case MISSING_INTERNAL_TOPICS:
+                    case INCORRECTLY_PARTITIONED_TOPICS:
+                    case ASSIGNMENT_DELAYED:
+                        return false;
+                    default:
+                        // continue checking other statuses
+                }
             }
         }
+        return true;
     }
 
     /**
@@ -952,11 +983,13 @@ public class StreamsMembershipManager implements 
RequestManager {
      * @param activeTasks Target active tasks assignment received from the 
broker.
      * @param standbyTasks Target standby tasks assignment received from the 
broker.
      * @param warmupTasks Target warm-up tasks assignment received from the 
broker.
+     * @param isGroupReady True if the group is ready, false otherwise.
      */
     private void processAssignmentReceived(Map<String, SortedSet<Integer>> 
activeTasks,
                                            Map<String, SortedSet<Integer>> 
standbyTasks,
-                                           Map<String, SortedSet<Integer>> 
warmupTasks) {
-        replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks, 
warmupTasks);
+                                           Map<String, SortedSet<Integer>> 
warmupTasks,
+                                           boolean isGroupReady) {
+        replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks, 
warmupTasks, isGroupReady);
         if (!targetAssignmentReconciled()) {
             transitionTo(MemberState.RECONCILING);
         } else {
@@ -975,8 +1008,9 @@ public class StreamsMembershipManager implements 
RequestManager {
 
     private void replaceTargetAssignmentWithNewAssignment(Map<String, 
SortedSet<Integer>> activeTasks,
                                                           Map<String, 
SortedSet<Integer>> standbyTasks,
-                                                          Map<String, 
SortedSet<Integer>> warmupTasks) {
-        targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks)
+                                                          Map<String, 
SortedSet<Integer>> warmupTasks,
+                                                          boolean 
isGroupReady) {
+        targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks, 
isGroupReady)
             .ifPresent(updatedAssignment -> {
                 log.debug("Target assignment updated from {} to {}. Member 
will reconcile it on the next poll.",
                     targetAssignment, updatedAssignment);
@@ -1025,8 +1059,9 @@ public class StreamsMembershipManager implements 
RequestManager {
         SortedSet<StreamsRebalanceData.TaskId> ownedStandbyTasks = 
toTaskIdSet(currentAssignment.standbyTasks);
         SortedSet<StreamsRebalanceData.TaskId> assignedWarmupTasks = 
toTaskIdSet(targetAssignment.warmupTasks);
         SortedSet<StreamsRebalanceData.TaskId> ownedWarmupTasks = 
toTaskIdSet(currentAssignment.warmupTasks);
+        boolean isGroupReady = targetAssignment.isGroupReady;
 
-        log.info("Assigned tasks with local epoch {}\n" +
+        log.info("Assigned tasks with local epoch {} and group {}\n" +
                 "\tMember:                        {}\n" +
                 "\tAssigned active tasks:         {}\n" +
                 "\tOwned active tasks:            {}\n" +
@@ -1036,6 +1071,7 @@ public class StreamsMembershipManager implements 
RequestManager {
                 "\tAssigned warm-up tasks:        {}\n" +
                 "\tOwned warm-up tasks:           {}\n",
             targetAssignment.localEpoch,
+            isGroupReady ? "is ready" : "is not ready",
             memberId,
             assignedActiveTasks,
             ownedActiveTasks,
@@ -1064,7 +1100,7 @@ public class StreamsMembershipManager implements 
RequestManager {
 
         final CompletableFuture<Void> tasksRevokedAndAssigned = 
tasksRevoked.thenCompose(__ -> {
             if (!maybeAbortReconciliation()) {
-                return assignTasks(assignedActiveTasks, ownedActiveTasks, 
assignedStandbyTasks, assignedWarmupTasks);
+                return assignTasks(assignedActiveTasks, ownedActiveTasks, 
assignedStandbyTasks, assignedWarmupTasks, isGroupReady);
             }
             return CompletableFuture.completedFuture(null);
         });
@@ -1117,7 +1153,8 @@ public class StreamsMembershipManager implements 
RequestManager {
     private CompletableFuture<Void> assignTasks(final 
SortedSet<StreamsRebalanceData.TaskId> activeTasksToAssign,
                                                 final 
SortedSet<StreamsRebalanceData.TaskId> ownedActiveTasks,
                                                 final 
SortedSet<StreamsRebalanceData.TaskId> standbyTasksToAssign,
-                                                final 
SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign) {
+                                                final 
SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign,
+                                                final boolean isGroupReady) {
         log.info("Assigning active tasks {{}}, standby tasks {{}}, and warm-up 
tasks {{}} to the member.",
             activeTasksToAssign.stream()
                 .map(StreamsRebalanceData.TaskId::toString)
@@ -1145,7 +1182,8 @@ public class StreamsMembershipManager implements 
RequestManager {
                 new StreamsRebalanceData.Assignment(
                     activeTasksToAssign,
                     standbyTasksToAssign,
-                    warmupTasksToAssign
+                    warmupTasksToAssign,
+                    isGroupReady
                 )
             );
         onTasksAssignedCallbackExecuted.whenComplete((__, callbackError) -> {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index c6fe1fd9215..ac3c53c47d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -157,18 +157,23 @@ public class StreamsRebalanceData {
 
         private final Set<TaskId> warmupTasks;
 
+        private final boolean isGroupReady;
+
         private Assignment() {
             this.activeTasks = Set.of();
             this.standbyTasks = Set.of();
             this.warmupTasks = Set.of();
+            this.isGroupReady = false;
         }
 
         public Assignment(final Set<TaskId> activeTasks,
                           final Set<TaskId> standbyTasks,
-                          final Set<TaskId> warmupTasks) {
+                          final Set<TaskId> warmupTasks,
+                          final boolean isGroupReady) {
             this.activeTasks = Set.copyOf(Objects.requireNonNull(activeTasks, 
"Active tasks cannot be null"));
             this.standbyTasks = 
Set.copyOf(Objects.requireNonNull(standbyTasks, "Standby tasks cannot be 
null"));
             this.warmupTasks = Set.copyOf(Objects.requireNonNull(warmupTasks, 
"Warmup tasks cannot be null"));
+            this.isGroupReady = isGroupReady;
         }
 
         public Set<TaskId> activeTasks() {
@@ -183,6 +188,10 @@ public class StreamsRebalanceData {
             return warmupTasks;
         }
 
+        public boolean isGroupReady() {
+            return isGroupReady;
+        }
+
         @Override
         public boolean equals(final Object o) {
             if (this == o) {
@@ -194,16 +203,17 @@ public class StreamsRebalanceData {
             final Assignment that = (Assignment) o;
             return Objects.equals(activeTasks, that.activeTasks)
                 && Objects.equals(standbyTasks, that.standbyTasks)
-                && Objects.equals(warmupTasks, that.warmupTasks);
+                && Objects.equals(warmupTasks, that.warmupTasks)
+                && isGroupReady == that.isGroupReady;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(activeTasks, standbyTasks, warmupTasks);
+            return Objects.hash(activeTasks, standbyTasks, warmupTasks, 
isGroupReady);
         }
 
         public Assignment copy() {
-            return new Assignment(activeTasks, standbyTasks, warmupTasks);
+            return new Assignment(activeTasks, standbyTasks, warmupTasks, 
isGroupReady);
         }
 
         @Override
@@ -212,6 +222,7 @@ public class StreamsRebalanceData {
                 "activeTasks=" + activeTasks +
                 ", standbyTasks=" + standbyTasks +
                 ", warmupTasks=" + warmupTasks +
+                ", isGroupReady=" + isGroupReady +
                 '}';
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
index 32fe55f12cd..87c10a98d37 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -81,7 +82,18 @@ public class StreamsGroupHeartbeatResponse extends 
AbstractResponse {
         MISSING_SOURCE_TOPICS((byte) 1, "One or more source topics are missing 
or a source topic regex resolves to zero topics."),
         INCORRECTLY_PARTITIONED_TOPICS((byte) 2, "One or more topics expected 
to be copartitioned are not copartitioned."),
         MISSING_INTERNAL_TOPICS((byte) 3, "One or more internal topics are 
missing."),
-        SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the 
whole application.");
+        SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the 
whole application."),
+        ASSIGNMENT_DELAYED((byte) 5, "The assignment was delayed by the 
coordinator.");
+
+        private static final Map<Byte, Status> CODE_TO_STATUS;
+
+        static {
+            Map<Byte, Status> map = new HashMap<>();
+            for (Status status : values()) {
+                map.put(status.code, status);
+            }
+            CODE_TO_STATUS = Collections.unmodifiableMap(map);
+        }
 
         private final byte code;
         private final String message;
@@ -98,5 +110,13 @@ public class StreamsGroupHeartbeatResponse extends 
AbstractResponse {
         public String message() {
             return message;
         }
+
+        public static Status fromCode(byte code) {
+            Status status = CODE_TO_STATUS.get(code);
+            if (status == null) {
+                throw new IllegalArgumentException("Unknown code " + code);
+            }
+            return status;
+        }
     }
 }
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index 220c9704a23..27cf47bb1a4 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -94,6 +94,7 @@
       //                                       The group coordinator will 
attempt to create all missing internal topics, if any errors occur during
       //                                       topic creation, this will be 
indicated in StatusDetail.
       //  4 - SHUTDOWN_APPLICATION           - A client requested the shutdown 
of the whole application.
+      //  5 - ASSIGNMENT_DELAYED             - No assignment was provided 
because assignment computation was delayed.
       { "name": "StatusCode", "type": "int8", "versions": "0+",
         "about": "A code to indicate that a particular status is active for 
the group membership" },
       { "name": "StatusDetail", "type": "string", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 9e4b8437144..35392e99398 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -821,7 +821,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
                     new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
                     new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
                     new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
-                )
+                ),
+                true
             )
         );
 
@@ -870,7 +871,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
                     new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
                 ),
                 Set.of(
-                )
+                ),
+                true
             )
         );
 
@@ -923,7 +925,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
                     new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
                     new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
                     new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
-                )
+                ),
+                true
             )
         );
         StreamsGroupHeartbeatRequestData requestDataBeforeReset = 
heartbeatState.buildRequestData();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index c0255a4d29a..50e582f5cc4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -72,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -1937,6 +1938,242 @@ public class StreamsMembershipManagerTest {
         verifyInStateUnsubscribed(membershipManager);
     }
 
+    @Test
+    public void testIsGroupReadyWithMissingSourceTopicsStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+                .setStatusDetail("One or more source topics are missing.")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithMissingInternalTopicsStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+                .setStatusDetail("One or more internal topics are missing.")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithIncorrectlyPartitionedTopicsStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code())
+                .setStatusDetail("One or more topics expected to be 
copartitioned are not copartitioned.")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithAssignmentDelayedStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+                .setStatusDetail("Assignment delayed due to the configured 
initial rebalance delay.")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithNoStatuses() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            null
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            true
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithOtherStatuses() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code())
+                .setStatusDetail("The topology epoch supplied is inconsistent 
with the topology for this streams group.")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            true
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyChangeWhenTasksAreNull() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final StreamsGroupHeartbeatResponse responseWithTasks = 
makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            null
+        );
+
+        membershipManager.onHeartbeatSuccess(responseWithTasks);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future1 = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            true
+        );
+        future1.complete(null);
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+                .setStatusDetail("Assignment delayed due to the configured 
initial rebalance delay.")
+        );
+
+        final StreamsGroupHeartbeatResponse responseWithoutTasks = 
makeHeartbeatResponse(
+            null,
+            null,
+            null,
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(responseWithoutTasks);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future2 = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+        future2.complete(null);
+    }
+
     private void verifyThatNoTasksHaveBeenRevoked() {
         verify(backgroundEventHandler, 
never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
         verify(subscriptionState, never()).markPendingRevocation(any());
@@ -2036,9 +2273,16 @@ public class StreamsMembershipManagerTest {
     private CompletableFuture<Void> 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
                                                                                
                           final Set<StreamsRebalanceData.TaskId> standbyTasks,
                                                                                
                           final Set<StreamsRebalanceData.TaskId> warmupTasks) {
+        return 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
 standbyTasks, warmupTasks, true);
+    }
+
+    private CompletableFuture<Void> 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
+                                                                               
                           final Set<StreamsRebalanceData.TaskId> standbyTasks,
+                                                                               
                           final Set<StreamsRebalanceData.TaskId> warmupTasks,
+                                                                               
                           final boolean isGroupReady) {
         verify(backgroundEventHandler, 
times(++onTasksAssignedCallbackNeededAddCount)).add(onTasksAssignedCallbackNeededEventCaptor.capture());
         final StreamsOnTasksAssignedCallbackNeededEvent 
onTasksAssignedCallbackNeeded = 
onTasksAssignedCallbackNeededEventCaptor.getValue();
-        assertEquals(makeTaskAssignment(activeTasks, standbyTasks, 
warmupTasks), onTasksAssignedCallbackNeeded.assignment());
+        assertEquals(makeTaskAssignment(activeTasks, standbyTasks, 
warmupTasks, isGroupReady), onTasksAssignedCallbackNeeded.assignment());
         return onTasksAssignedCallbackNeeded.future();
     }
 
@@ -2072,7 +2316,7 @@ public class StreamsMembershipManagerTest {
 
     private void 
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(final String 
subtopologyId,
                                                                            
final String topicName) {
-        when(streamsRebalanceData.subtopologies()).thenReturn(
+        lenient().when(streamsRebalanceData.subtopologies()).thenReturn(
             mkMap(
                 mkEntry(
                     subtopologyId,
@@ -2092,7 +2336,7 @@ public class StreamsMembershipManagerTest {
                                                                 final String 
topicName1,
                                                                 final String 
subtopologyId2,
                                                                 final String 
topicName2) {
-        when(streamsRebalanceData.subtopologies()).thenReturn(
+        lenient().when(streamsRebalanceData.subtopologies()).thenReturn(
             mkMap(
                 mkEntry(
                     subtopologyId1,
@@ -2199,6 +2443,14 @@ public class StreamsMembershipManagerTest {
                                                                 final 
List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks,
                                                                 final 
List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks,
                                                                 final int 
memberEpoch) {
+        return makeHeartbeatResponse(activeTasks, standbyTasks, warmupTasks, 
memberEpoch, null);
+    }
+
+    private StreamsGroupHeartbeatResponse makeHeartbeatResponse(final 
List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks,
+                                                                final 
List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks,
+                                                                final 
List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks,
+                                                                final int 
memberEpoch,
+                                                                final 
List<StreamsGroupHeartbeatResponseData.Status> statuses) {
         final StreamsGroupHeartbeatResponseData responseData = new 
StreamsGroupHeartbeatResponseData()
             .setErrorCode(Errors.NONE.code())
             .setMemberId(membershipManager.memberId())
@@ -2206,16 +2458,27 @@ public class StreamsMembershipManagerTest {
             .setActiveTasks(activeTasks)
             .setStandbyTasks(standbyTasks)
             .setWarmupTasks(warmupTasks);
+        if (statuses != null) {
+            responseData.setStatus(statuses);
+        }
         return new StreamsGroupHeartbeatResponse(responseData);
     }
 
     private StreamsRebalanceData.Assignment makeTaskAssignment(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
                                                                final 
Set<StreamsRebalanceData.TaskId> standbyTasks,
                                                                final 
Set<StreamsRebalanceData.TaskId> warmupTasks) {
+        return makeTaskAssignment(activeTasks, standbyTasks, warmupTasks, 
true);
+    }
+
+    private StreamsRebalanceData.Assignment makeTaskAssignment(final 
Set<StreamsRebalanceData.TaskId> activeTasks,
+                                                               final 
Set<StreamsRebalanceData.TaskId> standbyTasks,
+                                                               final 
Set<StreamsRebalanceData.TaskId> warmupTasks,
+                                                               final boolean 
isGroupReady) {
         return new StreamsRebalanceData.Assignment(
             activeTasks,
             standbyTasks,
-            warmupTasks
+            warmupTasks,
+            isGroupReady
         );
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
index f2376640c01..237dd4f0993 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
@@ -92,7 +92,8 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData.Assignment assignment = new 
StreamsRebalanceData.Assignment(
             Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)),
             Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)),
-            Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3))
+            Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)),
+            true
         );
 
         assertThrows(
@@ -111,11 +112,11 @@ public class StreamsRebalanceDataTest {
 
     @Test
     public void assignmentShouldNotAcceptNulls() {
-        final Exception exception1 = assertThrows(NullPointerException.class, 
() -> new StreamsRebalanceData.Assignment(null, Set.of(), Set.of()));
+        final Exception exception1 = assertThrows(NullPointerException.class, 
() -> new StreamsRebalanceData.Assignment(null, Set.of(), Set.of(), true));
         assertEquals("Active tasks cannot be null", exception1.getMessage());
-        final Exception exception2 = assertThrows(NullPointerException.class, 
() -> new StreamsRebalanceData.Assignment(Set.of(), null, Set.of()));
+        final Exception exception2 = assertThrows(NullPointerException.class, 
() -> new StreamsRebalanceData.Assignment(Set.of(), null, Set.of(), true));
         assertEquals("Standby tasks cannot be null", exception2.getMessage());
-        final Exception exception3 = assertThrows(NullPointerException.class, 
() -> new StreamsRebalanceData.Assignment(Set.of(), Set.of(), null));
+        final Exception exception3 = assertThrows(NullPointerException.class, 
() -> new StreamsRebalanceData.Assignment(Set.of(), Set.of(), null, true));
         assertEquals("Warmup tasks cannot be null", exception3.getMessage());
     }
 
@@ -125,43 +126,56 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData.Assignment assignment = new 
StreamsRebalanceData.Assignment(
             Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)),
             Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)),
-            Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3))
+            Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)),
+            true
         );
         final StreamsRebalanceData.Assignment assignmentEqual = new 
StreamsRebalanceData.Assignment(
             assignment.activeTasks(),
             assignment.standbyTasks(),
-            assignment.warmupTasks()
+            assignment.warmupTasks(),
+            assignment.isGroupReady()
         );
         Set<StreamsRebalanceData.TaskId> unequalActiveTasks = new 
HashSet<>(assignment.activeTasks());
         unequalActiveTasks.add(additionalTask);
         final StreamsRebalanceData.Assignment assignmentUnequalActiveTasks = 
new StreamsRebalanceData.Assignment(
             unequalActiveTasks,
             assignment.standbyTasks(),
-            assignment.warmupTasks()
+            assignment.warmupTasks(),
+            assignment.isGroupReady()
         );
         Set<StreamsRebalanceData.TaskId> unequalStandbyTasks = new 
HashSet<>(assignment.standbyTasks());
         unequalStandbyTasks.add(additionalTask);
         final StreamsRebalanceData.Assignment assignmentUnequalStandbyTasks = 
new StreamsRebalanceData.Assignment(
             assignment.activeTasks(),
             unequalStandbyTasks,
-            assignment.warmupTasks()
+            assignment.warmupTasks(),
+            assignment.isGroupReady()
         );
         Set<StreamsRebalanceData.TaskId> unequalWarmupTasks = new 
HashSet<>(assignment.warmupTasks());
         unequalWarmupTasks.add(additionalTask);
         final StreamsRebalanceData.Assignment assignmentUnequalWarmupTasks = 
new StreamsRebalanceData.Assignment(
             assignment.activeTasks(),
             assignment.standbyTasks(),
-            unequalWarmupTasks
+            unequalWarmupTasks,
+            assignment.isGroupReady()
+        );
+        final StreamsRebalanceData.Assignment assignmentUnequalIsGroupReady = 
new StreamsRebalanceData.Assignment(
+            assignment.activeTasks(),
+            assignment.standbyTasks(),
+            assignment.warmupTasks(),
+            !assignment.isGroupReady()
         );
 
         assertEquals(assignment, assignmentEqual);
         assertNotEquals(assignment, assignmentUnequalActiveTasks);
         assertNotEquals(assignment, assignmentUnequalStandbyTasks);
         assertNotEquals(assignment, assignmentUnequalWarmupTasks);
+        assertNotEquals(assignment, assignmentUnequalIsGroupReady);
         assertEquals(assignment.hashCode(), assignmentEqual.hashCode());
         assertNotEquals(assignment.hashCode(), 
assignmentUnequalActiveTasks.hashCode());
         assertNotEquals(assignment.hashCode(), 
assignmentUnequalStandbyTasks.hashCode());
         assertNotEquals(assignment.hashCode(), 
assignmentUnequalWarmupTasks.hashCode());
+        assertNotEquals(assignment.hashCode(), 
assignmentUnequalIsGroupReady.hashCode());
     }
 
     @Test
@@ -169,7 +183,8 @@ public class StreamsRebalanceDataTest {
         final StreamsRebalanceData.Assignment assignment = new 
StreamsRebalanceData.Assignment(
             Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)),
             Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)),
-            Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3))
+            Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)),
+            true
         );
 
         final StreamsRebalanceData.Assignment copy = assignment.copy();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
index 749a4594ab8..b7c7275df58 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
@@ -60,7 +60,7 @@ public class StreamsRebalanceListenerInvokerTest {
 
     @Test
     public void testSetRebalanceListenerWithNull() {
-        NullPointerException exception = 
assertThrows(NullPointerException.class, 
+        NullPointerException exception = 
assertThrows(NullPointerException.class,
             () -> invoker.setRebalanceListener(null));
         assertEquals("StreamsRebalanceListener cannot be null", 
exception.getMessage());
     }
@@ -96,12 +96,12 @@ public class StreamsRebalanceListenerInvokerTest {
     @Test
     public void testInvokeAllTasksRevokedWithListener() {
         invoker.setRebalanceListener(mockListener);
-        
+
         StreamsRebalanceData.Assignment mockAssignment = 
createMockAssignment();
         
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
 
         Exception result = invoker.invokeAllTasksRevoked();
-        
+
         assertNull(result);
         verify(mockListener).onTasksRevoked(eq(mockAssignment.activeTasks()));
     }
@@ -112,7 +112,7 @@ public class StreamsRebalanceListenerInvokerTest {
         StreamsRebalanceData.Assignment assignment = createMockAssignment();
 
         Exception result = invoker.invokeTasksAssigned(assignment);
-        
+
         assertNull(result);
         verify(mockListener).onTasksAssigned(eq(assignment));
     }
@@ -123,10 +123,10 @@ public class StreamsRebalanceListenerInvokerTest {
         StreamsRebalanceData.Assignment assignment = createMockAssignment();
         WakeupException wakeupException = new WakeupException();
         
doThrow(wakeupException).when(mockListener).onTasksAssigned(assignment);
-        
-        WakeupException thrownException = assertThrows(WakeupException.class, 
+
+        WakeupException thrownException = assertThrows(WakeupException.class,
             () -> invoker.invokeTasksAssigned(assignment));
-        
+
         assertEquals(wakeupException, thrownException);
         verify(mockListener).onTasksAssigned(eq(assignment));
     }
@@ -137,10 +137,10 @@ public class StreamsRebalanceListenerInvokerTest {
         StreamsRebalanceData.Assignment assignment = createMockAssignment();
         InterruptException interruptException = new InterruptException("Test 
interrupt");
         
doThrow(interruptException).when(mockListener).onTasksAssigned(assignment);
-        
-        InterruptException thrownException = 
assertThrows(InterruptException.class, 
+
+        InterruptException thrownException = 
assertThrows(InterruptException.class,
             () -> invoker.invokeTasksAssigned(assignment));
-        
+
         assertEquals(interruptException, thrownException);
         verify(mockListener).onTasksAssigned(eq(assignment));
     }
@@ -151,9 +151,9 @@ public class StreamsRebalanceListenerInvokerTest {
         StreamsRebalanceData.Assignment assignment = createMockAssignment();
         RuntimeException runtimeException = new RuntimeException("Test 
exception");
         
doThrow(runtimeException).when(mockListener).onTasksAssigned(assignment);
-        
+
         Exception result = invoker.invokeTasksAssigned(assignment);
-        
+
         assertEquals(runtimeException, result);
         verify(mockListener).onTasksAssigned(eq(assignment));
     }
@@ -164,7 +164,7 @@ public class StreamsRebalanceListenerInvokerTest {
         Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
 
         Exception result = invoker.invokeTasksRevoked(tasks);
-        
+
         assertNull(result);
         verify(mockListener).onTasksRevoked(eq(tasks));
     }
@@ -175,10 +175,10 @@ public class StreamsRebalanceListenerInvokerTest {
         Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
         WakeupException wakeupException = new WakeupException();
         doThrow(wakeupException).when(mockListener).onTasksRevoked(tasks);
-        
-        WakeupException thrownException = assertThrows(WakeupException.class, 
+
+        WakeupException thrownException = assertThrows(WakeupException.class,
             () -> invoker.invokeTasksRevoked(tasks));
-        
+
         assertEquals(wakeupException, thrownException);
         verify(mockListener).onTasksRevoked(eq(tasks));
     }
@@ -189,10 +189,10 @@ public class StreamsRebalanceListenerInvokerTest {
         Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
         InterruptException interruptException = new InterruptException("Test 
interrupt");
         doThrow(interruptException).when(mockListener).onTasksRevoked(tasks);
-        
-        InterruptException thrownException = 
assertThrows(InterruptException.class, 
+
+        InterruptException thrownException = 
assertThrows(InterruptException.class,
             () -> invoker.invokeTasksRevoked(tasks));
-        
+
         assertEquals(interruptException, thrownException);
         verify(mockListener).onTasksRevoked(eq(tasks));
     }
@@ -203,9 +203,9 @@ public class StreamsRebalanceListenerInvokerTest {
         Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
         RuntimeException runtimeException = new RuntimeException("Test 
exception");
         doThrow(runtimeException).when(mockListener).onTasksRevoked(tasks);
-        
+
         Exception result = invoker.invokeTasksRevoked(tasks);
-        
+
         assertEquals(runtimeException, result);
         verify(mockListener).onTasksRevoked(eq(tasks));
     }
@@ -215,7 +215,7 @@ public class StreamsRebalanceListenerInvokerTest {
         invoker.setRebalanceListener(mockListener);
 
         Exception result = invoker.invokeAllTasksLost();
-        
+
         assertNull(result);
         verify(mockListener).onAllTasksLost();
     }
@@ -225,10 +225,10 @@ public class StreamsRebalanceListenerInvokerTest {
         invoker.setRebalanceListener(mockListener);
         WakeupException wakeupException = new WakeupException();
         doThrow(wakeupException).when(mockListener).onAllTasksLost();
-        
-        WakeupException thrownException = assertThrows(WakeupException.class, 
+
+        WakeupException thrownException = assertThrows(WakeupException.class,
             () -> invoker.invokeAllTasksLost());
-        
+
         assertEquals(wakeupException, thrownException);
         verify(mockListener).onAllTasksLost();
     }
@@ -238,10 +238,10 @@ public class StreamsRebalanceListenerInvokerTest {
         invoker.setRebalanceListener(mockListener);
         InterruptException interruptException = new InterruptException("Test 
interrupt");
         doThrow(interruptException).when(mockListener).onAllTasksLost();
-        
-        InterruptException thrownException = 
assertThrows(InterruptException.class, 
+
+        InterruptException thrownException = 
assertThrows(InterruptException.class,
             () -> invoker.invokeAllTasksLost());
-        
+
         assertEquals(interruptException, thrownException);
         verify(mockListener).onAllTasksLost();
     }
@@ -251,9 +251,9 @@ public class StreamsRebalanceListenerInvokerTest {
         invoker.setRebalanceListener(mockListener);
         RuntimeException runtimeException = new RuntimeException("Test 
exception");
         doThrow(runtimeException).when(mockListener).onAllTasksLost();
-        
+
         Exception result = invoker.invokeAllTasksLost();
-        
+
         assertEquals(runtimeException, result);
         verify(mockListener).onAllTasksLost();
     }
@@ -262,8 +262,8 @@ public class StreamsRebalanceListenerInvokerTest {
         Set<StreamsRebalanceData.TaskId> activeTasks = createMockTasks();
         Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of();
         Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of();
-        
-        return new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, 
warmupTasks);
+
+        return new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, 
warmupTasks, true);
     }
 
     private Set<StreamsRebalanceData.TaskId> createMockTasks() {
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3d7a54ec2e2..5ce46083341 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3847,6 +3847,9 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     val response = sendRequestAndVerifyResponseError(request, resource, 
isAuthorized = true).asInstanceOf[StreamsGroupHeartbeatResponse]
     assertEquals(
       util.List.of(new StreamsGroupHeartbeatResponseData.Status()
+        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+        .setStatusDetail("Assignment delayed due to the configured initial 
rebalance delay."),
+        new StreamsGroupHeartbeatResponseData.Status()
         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
         .setStatusDetail("Internal topics are missing: [topic]; Unauthorized 
to CREATE on topics topic.")),
     response.data().status())
@@ -3878,6 +3881,9 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     // Request successful, and no internal topic creation error.
     assertEquals(
       util.List.of(new StreamsGroupHeartbeatResponseData.Status()
+        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+        .setStatusDetail("Assignment delayed due to the configured initial 
rebalance delay."),
+        new StreamsGroupHeartbeatResponseData.Status()
         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
         .setStatusDetail("Internal topics are missing: [topic]")),
       response.data().status())
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index b0e62b49899..77b47dc00d5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2068,6 +2068,12 @@ public class GroupMetadataManager {
                 // During initial rebalance delay, return empty assignment to 
first joining members.
                 targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
                 targetAssignment = TasksTuple.EMPTY;
+
+                returnedStatus.add(
+                    new Status()
+                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+                        .setStatusDetail("Assignment delayed due to the 
configured initial rebalance delay.")
+                );
             } else {
                 targetAssignment = updateStreamsTargetAssignment(
                     group,
@@ -2155,6 +2161,7 @@ public class GroupMetadataManager {
         ));
 
         response.setStatus(returnedStatus);
+
         return new CoordinatorResult<>(records, new 
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 213f16affa4..769b282403c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -17581,7 +17581,11 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setPartitionsByUserEndpoint(null)
                 .setEndpointInformationEpoch(0)
-                .setStatus(List.of()),
+                .setStatus(List.of(
+                    new StreamsGroupHeartbeatResponseData.Status()
+                        .setStatusCode(Status.ASSIGNMENT_DELAYED.code())
+                        .setStatusDetail("Assignment delayed due to the 
configured initial rebalance delay.")
+                )),
             result.response().data()
         );
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index 121b6d5217a..df993ac81ad 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -247,6 +247,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
                         return "org.apache.kafka." + group + "." + name;
                     }).filter(name -> 
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter 
filters out string metrics
                     .sorted().toList();
+            TestUtils.waitForCondition(
+                () -> 
TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).size()
 == expectedMetrics.size(),
+                30_000,
+                "Never received enough metrics"
+            );
             final List<String> actualMetrics = new 
ArrayList<>(TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
             assertEquals(expectedMetrics, actualMetrics);
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index a56723abc33..b31cee581b0 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyValue;
@@ -48,15 +49,17 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
@@ -339,8 +342,13 @@ public class MetricsIntegrationTest {
             () -> "Kafka Streams application did not reach state NOT_RUNNING 
in " + timeout + " ms");
     }
 
-    @Test
-    public void shouldAddMetricsOnAllLevels() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void shouldAddMetricsOnAllLevels(final boolean 
streamsProtocolEnabled) throws Exception {
+        if (streamsProtocolEnabled) {
+            streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+        }
+
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), 
Serdes.String()));
         builder.table(STREAM_OUTPUT_1,
@@ -373,8 +381,13 @@ public class MetricsIntegrationTest {
         checkMetricsDeregistration();
     }
 
-    @Test
-    public void shouldAddMetricsForWindowStoreAndSuppressionBuffer() throws 
Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final 
boolean streamsProtocolEnabled) throws Exception {
+        if (streamsProtocolEnabled) {
+            streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+        }
+
         final Duration windowSize = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .groupByKey()
@@ -401,8 +414,13 @@ public class MetricsIntegrationTest {
         checkMetricsDeregistration();
     }
 
-    @Test
-    public void shouldAddMetricsForSessionStore() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void shouldAddMetricsForSessionStore(final boolean 
streamsProtocolEnabled) throws Exception {
+        if (streamsProtocolEnabled) {
+            streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+        }
+
         final Duration inactivityGap = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .groupByKey()
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 54aa5dd5f0d..d77355831bc 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -42,15 +43,17 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -142,9 +145,13 @@ public class RocksDBMetricsIntegrationTest {
         void verify(final KafkaStreams kafkaStreams, final String metricScope) 
throws Exception;
     }
 
-    @Test
-    public void 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final TestInfo 
testInfo) throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void 
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean 
streamsProtocolEnabled, final TestInfo testInfo) throws Exception {
         final Properties streamsConfiguration = streamsConfig(testInfo);
+        if (streamsProtocolEnabled) {
+            streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+        }
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
         final StreamsBuilder builder = builderForStateStores();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
index 41accecb1b1..1cdfcba6810 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
@@ -96,6 +96,7 @@ public class DefaultStreamsRebalanceListener implements 
StreamsRebalanceListener
         log.info("Processing new assignment {} from Streams Rebalance 
Protocol", assignment);
 
         try {
+            streamThread.setStreamsGroupReady(assignment.isGroupReady());
             taskManager.handleAssignment(activeTasksWithPartitions, 
standbyTasksWithPartitions);
             streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
             taskManager.handleRebalanceComplete();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 84c4f51734a..405ea720b6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -339,6 +339,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     private long lastPurgeMs;
     private long lastPartitionAssignedMs = -1L;
     private int numIterations;
+    private boolean streamsGroupReady = false;
     private volatile State state = State.CREATED;
     private volatile ThreadMetadata threadMetadata;
     private StreamThread.StateListener stateListener;
@@ -1163,7 +1164,15 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         pollLatency = pollPhase();
         totalPolledSinceLastSummary += 1;
 
-        handleStreamsRebalanceData();
+        if (streamsRebalanceData.isPresent()) {
+            // Always handle status codes (e.g., MISSING_SOURCE_TOPICS, 
INCORRECTLY_PARTITIONED_TOPICS)
+            // regardless of streamsGroupReady, as these may throw exceptions 
that need to be handled.
+            handleStreamsRebalanceData();
+
+            if (!streamsGroupReady) {
+                return;
+            }
+        }
 
         // Shutdown hook could potentially be triggered and transit the thread 
state to PENDING_SHUTDOWN during #pollRequests().
         // The task manager internal states could be uninitialized if the 
state transition happens during #onPartitionsAssigned().
@@ -1307,7 +1316,15 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         taskManager.resumePollingForPartitionsWithAvailableSpace();
         pollLatency = pollPhase();
 
-        handleStreamsRebalanceData();
+        if (streamsRebalanceData.isPresent()) {
+            // Always handle status codes (e.g., MISSING_SOURCE_TOPICS, 
INCORRECTLY_PARTITIONED_TOPICS)
+            // regardless of streamsGroupReady, as these may throw exceptions 
that need to be handled.
+            handleStreamsRebalanceData();
+
+            if (!streamsGroupReady) {
+                return;
+            }
+        }
 
         // Shutdown hook could potentially be triggered and transit the thread 
state to PENDING_SHUTDOWN during #pollRequests().
         // The task manager internal states could be uninitialized if the 
state transition happens during #onPartitionsAssigned().
@@ -1487,6 +1504,17 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return records;
     }
 
+    /**
+     * Sets the readiness state of the Streams group for this thread.
+     *
+     * @param ready {@code true} if the Streams group is ready to process 
records; {@code false} otherwise.
+     *              When set to {@code true}, this thread may transition to an 
active processing state.
+     *              When set to {@code false}, the thread will not process 
records until the group is ready.
+     */
+    public void setStreamsGroupReady(final boolean ready) {
+        streamsGroupReady = ready;
+    }
+
     public void handleStreamsRebalanceData() {
         if (streamsRebalanceData.isPresent()) {
             boolean hasMissingSourceTopics = false;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
index 0e9e013f628..81103b3a2ea 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
@@ -173,12 +173,14 @@ public class DefaultStreamsRebalanceListenerTest {
         final StreamsRebalanceData.Assignment assignment = new 
StreamsRebalanceData.Assignment(
             Set.of(new StreamsRebalanceData.TaskId("1", 0)),
             Set.of(new StreamsRebalanceData.TaskId("2", 0)),
-            Set.of(new StreamsRebalanceData.TaskId("3", 0))
+            Set.of(new StreamsRebalanceData.TaskId("3", 0)),
+            false
         );
 
         assertDoesNotThrow(() -> 
defaultStreamsRebalanceListener.onTasksAssigned(assignment));
 
         final InOrder inOrder = inOrder(taskManager, streamThread, 
streamsRebalanceData);
+        inOrder.verify(streamThread).setStreamsGroupReady(false);
         inOrder.verify(taskManager).handleAssignment(
             Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), 
new TopicPartition("repartition1", 0))),
             Map.of(
@@ -201,10 +203,11 @@ public class DefaultStreamsRebalanceListenerTest {
         createRebalanceListenerWithRebalanceData(streamsRebalanceData);
 
         final Exception actualException = assertThrows(RuntimeException.class, 
() -> defaultStreamsRebalanceListener.onTasksAssigned(
-            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
+            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), 
false)
         ));
 
         assertEquals(exception, actualException);
+        verify(streamThread).setStreamsGroupReady(false);
         verify(taskManager).handleAssignment(any(), any());
         verify(streamThread, 
never()).setState(StreamThread.State.PARTITIONS_ASSIGNED);
         verify(taskManager, never()).handleRebalanceComplete();
@@ -302,11 +305,13 @@ public class DefaultStreamsRebalanceListenerTest {
             new StreamsRebalanceData.Assignment(
                 Set.of(new StreamsRebalanceData.TaskId("1", 0)),
                 Set.of(),
-                Set.of()
+                Set.of(),
+                true
             )
         );
 
         verify(tasksAssignedSensor).record(150L);
+        verify(streamThread).setStreamsGroupReady(true);
         verify(taskManager).handleAssignment(
             Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), 
new TopicPartition("repartition1", 0))),
             Map.of()
@@ -376,10 +381,11 @@ public class DefaultStreamsRebalanceListenerTest {
         createRebalanceListenerWithRebalanceData(new 
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
 
         assertThrows(RuntimeException.class, () -> 
defaultStreamsRebalanceListener.onTasksAssigned(
-            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
+            new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), 
false)
         ));
 
         verify(tasksAssignedSensor).record(75L);
+        verify(streamThread).setStreamsGroupReady(false);
         verify(taskManager).handleAssignment(any(), any());
     }
 

Reply via email to