This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d62139ff9ef4628c881983cf0a4faa3d0e903f19 Author: Lucas Brutschy <[email protected]> AuthorDate: Thu Dec 11 01:22:18 2025 +0100 KAFKA-19978: Don't enter running state as long as group is not ready (#21110) Since internal topics are created asynchronous, and we do not want to have heartbeats failing, when an application has not yet created its internal topics, it remains in NOT_READY, until the internal topics are created. The way the client-side is implemented right now, the Kafka Streams application will still enter RUNNING state with an empty assignment, when internal topics are missing. From the outside, it will therefore look like the initial set-up of the application has finished, while the application is still waiting for the group to become ready. This propagates the state of the group as part of the assignment, so that we delay entering a "RUNNING" state until the group has become ready. Reviewers: Matthias J. Sax <[email protected]> --- .../internals/StreamsMembershipManager.java | 84 +++++-- .../consumer/internals/StreamsRebalanceData.java | 19 +- .../requests/StreamsGroupHeartbeatResponse.java | 22 +- .../message/StreamsGroupHeartbeatResponse.json | 1 + .../StreamsGroupHeartbeatRequestManagerTest.java | 9 +- .../internals/StreamsMembershipManagerTest.java | 271 ++++++++++++++++++++- .../internals/StreamsRebalanceDataTest.java | 35 ++- .../StreamsRebalanceListenerInvokerTest.java | 64 ++--- .../kafka/api/AuthorizerIntegrationTest.scala | 6 + .../coordinator/group/GroupMetadataManager.java | 7 + .../group/GroupMetadataManagerTest.java | 6 +- .../KafkaStreamsTelemetryIntegrationTest.java | 5 + .../integration/MetricsIntegrationTest.java | 32 ++- .../integration/RocksDBMetricsIntegrationTest.java | 13 +- .../internals/DefaultStreamsRebalanceListener.java | 1 + .../streams/processor/internals/StreamThread.java | 32 ++- .../DefaultStreamsRebalanceListenerTest.java | 14 +- 17 files changed, 527 insertions(+), 94 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java index 84ac83125be..fcb0b8ad6c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java @@ -80,22 +80,26 @@ public class StreamsMembershipManager implements RequestManager { NONE_EPOCH, Collections.emptyMap(), Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap(), + false ); public final long localEpoch; public final Map<String, SortedSet<Integer>> activeTasks; public final Map<String, SortedSet<Integer>> standbyTasks; public final Map<String, SortedSet<Integer>> warmupTasks; + public final boolean isGroupReady; public LocalAssignment(final long localEpoch, final Map<String, SortedSet<Integer>> activeTasks, final Map<String, SortedSet<Integer>> standbyTasks, - final Map<String, SortedSet<Integer>> warmupTasks) { + final Map<String, SortedSet<Integer>> warmupTasks, + final boolean isGroupReady) { this.localEpoch = localEpoch; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.warmupTasks = warmupTasks; + this.isGroupReady = isGroupReady; if (localEpoch == NONE_EPOCH && (!activeTasks.isEmpty() || !standbyTasks.isEmpty() || !warmupTasks.isEmpty())) { throw new IllegalArgumentException("Local epoch must be set if tasks are assigned."); @@ -104,16 +108,19 @@ public class StreamsMembershipManager implements RequestManager { Optional<LocalAssignment> updateWith(final Map<String, SortedSet<Integer>> activeTasks, final Map<String, SortedSet<Integer>> standbyTasks, - final Map<String, SortedSet<Integer>> warmupTasks) { + final Map<String, SortedSet<Integer>> warmupTasks, + final boolean isGroupReady) { if (localEpoch != NONE_EPOCH && activeTasks.equals(this.activeTasks) && standbyTasks.equals(this.standbyTasks) && - warmupTasks.equals(this.warmupTasks)) { + warmupTasks.equals(this.warmupTasks) && + isGroupReady == this.isGroupReady + ) { return Optional.empty(); } long nextLocalEpoch = localEpoch + 1; - return Optional.of(new LocalAssignment(nextLocalEpoch, activeTasks, standbyTasks, warmupTasks)); + return Optional.of(new LocalAssignment(nextLocalEpoch, activeTasks, standbyTasks, warmupTasks, isGroupReady)); } @Override @@ -123,6 +130,7 @@ public class StreamsMembershipManager implements RequestManager { ", activeTasks=" + activeTasks + ", standbyTasks=" + standbyTasks + ", warmupTasks=" + warmupTasks + + ", isGroupReady=" + isGroupReady + '}'; } @@ -134,12 +142,13 @@ public class StreamsMembershipManager implements RequestManager { return localEpoch == that.localEpoch && Objects.equals(activeTasks, that.activeTasks) && Objects.equals(standbyTasks, that.standbyTasks) && - Objects.equals(warmupTasks, that.warmupTasks); + Objects.equals(warmupTasks, that.warmupTasks) && + isGroupReady == that.isGroupReady; } @Override public int hashCode() { - return Objects.hash(localEpoch, activeTasks, standbyTasks, warmupTasks); + return Objects.hash(localEpoch, activeTasks, standbyTasks, warmupTasks, isGroupReady); } } @@ -686,9 +695,9 @@ public class StreamsMembershipManager implements RequestManager { final List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks = responseData.activeTasks(); final List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks = responseData.standbyTasks(); final List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks = responseData.warmupTasks(); + final boolean isGroupReady = isGroupReady(responseData.status()); if (activeTasks != null && standbyTasks != null && warmupTasks != null) { - if (!state.canHandleNewAssignment()) { log.debug("Ignoring new assignment: active tasks {}, standby tasks {}, and warm-up tasks {} received " + "from server because member is in {} state.", @@ -699,17 +708,39 @@ public class StreamsMembershipManager implements RequestManager { processAssignmentReceived( toTasksAssignment(activeTasks), toTasksAssignment(standbyTasks), - toTasksAssignment(warmupTasks) + toTasksAssignment(warmupTasks), + isGroupReady ); - } else { - if (responseData.activeTasks() != null || - responseData.standbyTasks() != null || - responseData.warmupTasks() != null) { + } else if (responseData.activeTasks() != null || responseData.standbyTasks() != null || responseData.warmupTasks() != null) { + throw new IllegalStateException("Invalid response data, task collections must be all null or all non-null: " + + responseData); + } else if (isGroupReady != targetAssignment.isGroupReady) { + // If the client did not provide a new assignment, but the group is now ready or not ready anymore, so + // update the target assignment and reconcile it. + processAssignmentReceived( + targetAssignment.activeTasks, + targetAssignment.standbyTasks, + targetAssignment.warmupTasks, + isGroupReady + ); + } + } - throw new IllegalStateException("Invalid response data, task collections must be all null or all non-null: " - + responseData); + private boolean isGroupReady(List<StreamsGroupHeartbeatResponseData.Status> statuses) { + if (statuses != null) { + for (final StreamsGroupHeartbeatResponseData.Status status : statuses) { + switch (StreamsGroupHeartbeatResponse.Status.fromCode(status.statusCode())) { + case MISSING_SOURCE_TOPICS: + case MISSING_INTERNAL_TOPICS: + case INCORRECTLY_PARTITIONED_TOPICS: + case ASSIGNMENT_DELAYED: + return false; + default: + // continue checking other statuses + } } } + return true; } /** @@ -952,11 +983,13 @@ public class StreamsMembershipManager implements RequestManager { * @param activeTasks Target active tasks assignment received from the broker. * @param standbyTasks Target standby tasks assignment received from the broker. * @param warmupTasks Target warm-up tasks assignment received from the broker. + * @param isGroupReady True if the group is ready, false otherwise. */ private void processAssignmentReceived(Map<String, SortedSet<Integer>> activeTasks, Map<String, SortedSet<Integer>> standbyTasks, - Map<String, SortedSet<Integer>> warmupTasks) { - replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks, warmupTasks); + Map<String, SortedSet<Integer>> warmupTasks, + boolean isGroupReady) { + replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks, warmupTasks, isGroupReady); if (!targetAssignmentReconciled()) { transitionTo(MemberState.RECONCILING); } else { @@ -975,8 +1008,9 @@ public class StreamsMembershipManager implements RequestManager { private void replaceTargetAssignmentWithNewAssignment(Map<String, SortedSet<Integer>> activeTasks, Map<String, SortedSet<Integer>> standbyTasks, - Map<String, SortedSet<Integer>> warmupTasks) { - targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks) + Map<String, SortedSet<Integer>> warmupTasks, + boolean isGroupReady) { + targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks, isGroupReady) .ifPresent(updatedAssignment -> { log.debug("Target assignment updated from {} to {}. Member will reconcile it on the next poll.", targetAssignment, updatedAssignment); @@ -1025,8 +1059,9 @@ public class StreamsMembershipManager implements RequestManager { SortedSet<StreamsRebalanceData.TaskId> ownedStandbyTasks = toTaskIdSet(currentAssignment.standbyTasks); SortedSet<StreamsRebalanceData.TaskId> assignedWarmupTasks = toTaskIdSet(targetAssignment.warmupTasks); SortedSet<StreamsRebalanceData.TaskId> ownedWarmupTasks = toTaskIdSet(currentAssignment.warmupTasks); + boolean isGroupReady = targetAssignment.isGroupReady; - log.info("Assigned tasks with local epoch {}\n" + + log.info("Assigned tasks with local epoch {} and group {}\n" + "\tMember: {}\n" + "\tAssigned active tasks: {}\n" + "\tOwned active tasks: {}\n" + @@ -1036,6 +1071,7 @@ public class StreamsMembershipManager implements RequestManager { "\tAssigned warm-up tasks: {}\n" + "\tOwned warm-up tasks: {}\n", targetAssignment.localEpoch, + isGroupReady ? "is ready" : "is not ready", memberId, assignedActiveTasks, ownedActiveTasks, @@ -1064,7 +1100,7 @@ public class StreamsMembershipManager implements RequestManager { final CompletableFuture<Void> tasksRevokedAndAssigned = tasksRevoked.thenCompose(__ -> { if (!maybeAbortReconciliation()) { - return assignTasks(assignedActiveTasks, ownedActiveTasks, assignedStandbyTasks, assignedWarmupTasks); + return assignTasks(assignedActiveTasks, ownedActiveTasks, assignedStandbyTasks, assignedWarmupTasks, isGroupReady); } return CompletableFuture.completedFuture(null); }); @@ -1117,7 +1153,8 @@ public class StreamsMembershipManager implements RequestManager { private CompletableFuture<Void> assignTasks(final SortedSet<StreamsRebalanceData.TaskId> activeTasksToAssign, final SortedSet<StreamsRebalanceData.TaskId> ownedActiveTasks, final SortedSet<StreamsRebalanceData.TaskId> standbyTasksToAssign, - final SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign) { + final SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign, + final boolean isGroupReady) { log.info("Assigning active tasks {{}}, standby tasks {{}}, and warm-up tasks {{}} to the member.", activeTasksToAssign.stream() .map(StreamsRebalanceData.TaskId::toString) @@ -1145,7 +1182,8 @@ public class StreamsMembershipManager implements RequestManager { new StreamsRebalanceData.Assignment( activeTasksToAssign, standbyTasksToAssign, - warmupTasksToAssign + warmupTasksToAssign, + isGroupReady ) ); onTasksAssignedCallbackExecuted.whenComplete((__, callbackError) -> { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java index c6fe1fd9215..ac3c53c47d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java @@ -157,18 +157,23 @@ public class StreamsRebalanceData { private final Set<TaskId> warmupTasks; + private final boolean isGroupReady; + private Assignment() { this.activeTasks = Set.of(); this.standbyTasks = Set.of(); this.warmupTasks = Set.of(); + this.isGroupReady = false; } public Assignment(final Set<TaskId> activeTasks, final Set<TaskId> standbyTasks, - final Set<TaskId> warmupTasks) { + final Set<TaskId> warmupTasks, + final boolean isGroupReady) { this.activeTasks = Set.copyOf(Objects.requireNonNull(activeTasks, "Active tasks cannot be null")); this.standbyTasks = Set.copyOf(Objects.requireNonNull(standbyTasks, "Standby tasks cannot be null")); this.warmupTasks = Set.copyOf(Objects.requireNonNull(warmupTasks, "Warmup tasks cannot be null")); + this.isGroupReady = isGroupReady; } public Set<TaskId> activeTasks() { @@ -183,6 +188,10 @@ public class StreamsRebalanceData { return warmupTasks; } + public boolean isGroupReady() { + return isGroupReady; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -194,16 +203,17 @@ public class StreamsRebalanceData { final Assignment that = (Assignment) o; return Objects.equals(activeTasks, that.activeTasks) && Objects.equals(standbyTasks, that.standbyTasks) - && Objects.equals(warmupTasks, that.warmupTasks); + && Objects.equals(warmupTasks, that.warmupTasks) + && isGroupReady == that.isGroupReady; } @Override public int hashCode() { - return Objects.hash(activeTasks, standbyTasks, warmupTasks); + return Objects.hash(activeTasks, standbyTasks, warmupTasks, isGroupReady); } public Assignment copy() { - return new Assignment(activeTasks, standbyTasks, warmupTasks); + return new Assignment(activeTasks, standbyTasks, warmupTasks, isGroupReady); } @Override @@ -212,6 +222,7 @@ public class StreamsRebalanceData { "activeTasks=" + activeTasks + ", standbyTasks=" + standbyTasks + ", warmupTasks=" + warmupTasks + + ", isGroupReady=" + isGroupReady + '}'; } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java index 32fe55f12cd..87c10a98d37 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -81,7 +82,18 @@ public class StreamsGroupHeartbeatResponse extends AbstractResponse { MISSING_SOURCE_TOPICS((byte) 1, "One or more source topics are missing or a source topic regex resolves to zero topics."), INCORRECTLY_PARTITIONED_TOPICS((byte) 2, "One or more topics expected to be copartitioned are not copartitioned."), MISSING_INTERNAL_TOPICS((byte) 3, "One or more internal topics are missing."), - SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the whole application."); + SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the whole application."), + ASSIGNMENT_DELAYED((byte) 5, "The assignment was delayed by the coordinator."); + + private static final Map<Byte, Status> CODE_TO_STATUS; + + static { + Map<Byte, Status> map = new HashMap<>(); + for (Status status : values()) { + map.put(status.code, status); + } + CODE_TO_STATUS = Collections.unmodifiableMap(map); + } private final byte code; private final String message; @@ -98,5 +110,13 @@ public class StreamsGroupHeartbeatResponse extends AbstractResponse { public String message() { return message; } + + public static Status fromCode(byte code) { + Status status = CODE_TO_STATUS.get(code); + if (status == null) { + throw new IllegalArgumentException("Unknown code " + code); + } + return status; + } } } diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index 220c9704a23..27cf47bb1a4 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -94,6 +94,7 @@ // The group coordinator will attempt to create all missing internal topics, if any errors occur during // topic creation, this will be indicated in StatusDetail. // 4 - SHUTDOWN_APPLICATION - A client requested the shutdown of the whole application. + // 5 - ASSIGNMENT_DELAYED - No assignment was provided because assignment computation was delayed. { "name": "StatusCode", "type": "int8", "versions": "0+", "about": "A code to indicate that a particular status is active for the group membership" }, { "name": "StatusDetail", "type": "string", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 9e4b8437144..35392e99398 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -821,7 +821,8 @@ class StreamsGroupHeartbeatRequestManagerTest { new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) - ) + ), + true ) ); @@ -870,7 +871,8 @@ class StreamsGroupHeartbeatRequestManagerTest { new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2) ), Set.of( - ) + ), + true ) ); @@ -923,7 +925,8 @@ class StreamsGroupHeartbeatRequestManagerTest { new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5) - ) + ), + true ) ); StreamsGroupHeartbeatRequestData requestDataBeforeReset = heartbeatState.buildRequestData(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java index c0255a4d29a..50e582f5cc4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java @@ -72,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -1937,6 +1938,242 @@ public class StreamsMembershipManagerTest { verifyInStateUnsubscribed(membershipManager); } + @Test + public void testIsGroupReadyWithMissingSourceTopicsStatus() { + setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0); + joining(); + + final List<StreamsGroupHeartbeatResponseData.Status> statuses = List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) + .setStatusDetail("One or more source topics are missing.") + ); + + final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + MEMBER_EPOCH, + statuses + ); + + membershipManager.onHeartbeatSuccess(response); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + false + ); + + future.complete(null); + } + + @Test + public void testIsGroupReadyWithMissingInternalTopicsStatus() { + setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0); + joining(); + + final List<StreamsGroupHeartbeatResponseData.Status> statuses = List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) + .setStatusDetail("One or more internal topics are missing.") + ); + + final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + MEMBER_EPOCH, + statuses + ); + + membershipManager.onHeartbeatSuccess(response); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + false + ); + + future.complete(null); + } + + @Test + public void testIsGroupReadyWithIncorrectlyPartitionedTopicsStatus() { + setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0); + joining(); + + final List<StreamsGroupHeartbeatResponseData.Status> statuses = List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code()) + .setStatusDetail("One or more topics expected to be copartitioned are not copartitioned.") + ); + + final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + MEMBER_EPOCH, + statuses + ); + + membershipManager.onHeartbeatSuccess(response); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + false + ); + + future.complete(null); + } + + @Test + public void testIsGroupReadyWithAssignmentDelayedStatus() { + setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0); + joining(); + + final List<StreamsGroupHeartbeatResponseData.Status> statuses = List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code()) + .setStatusDetail("Assignment delayed due to the configured initial rebalance delay.") + ); + + final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + MEMBER_EPOCH, + statuses + ); + + membershipManager.onHeartbeatSuccess(response); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + false + ); + + future.complete(null); + } + + @Test + public void testIsGroupReadyWithNoStatuses() { + setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0); + joining(); + + final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + MEMBER_EPOCH, + null + ); + + membershipManager.onHeartbeatSuccess(response); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + true + ); + + future.complete(null); + } + + @Test + public void testIsGroupReadyWithOtherStatuses() { + setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0); + joining(); + + final List<StreamsGroupHeartbeatResponseData.Status> statuses = List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code()) + .setStatusDetail("The topology epoch supplied is inconsistent with the topology for this streams group.") + ); + + final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + MEMBER_EPOCH, + statuses + ); + + membershipManager.onHeartbeatSuccess(response); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + true + ); + + future.complete(null); + } + + @Test + public void testIsGroupReadyChangeWhenTasksAreNull() { + setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0); + joining(); + + final StreamsGroupHeartbeatResponse responseWithTasks = makeHeartbeatResponse( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + MEMBER_EPOCH, + null + ); + + membershipManager.onHeartbeatSuccess(responseWithTasks); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future1 = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + true + ); + future1.complete(null); + + final List<StreamsGroupHeartbeatResponseData.Status> statuses = List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code()) + .setStatusDetail("Assignment delayed due to the configured initial rebalance delay.") + ); + + final StreamsGroupHeartbeatResponse responseWithoutTasks = makeHeartbeatResponse( + null, + null, + null, + MEMBER_EPOCH, + statuses + ); + + membershipManager.onHeartbeatSuccess(responseWithoutTasks); + membershipManager.poll(time.milliseconds()); + + final CompletableFuture<Void> future2 = verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler( + Set.of(), + Set.of(), + Set.of(), + false + ); + future2.complete(null); + } + private void verifyThatNoTasksHaveBeenRevoked() { verify(backgroundEventHandler, never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class)); verify(subscriptionState, never()).markPendingRevocation(any()); @@ -2036,9 +2273,16 @@ public class StreamsMembershipManagerTest { private CompletableFuture<Void> verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final Set<StreamsRebalanceData.TaskId> activeTasks, final Set<StreamsRebalanceData.TaskId> standbyTasks, final Set<StreamsRebalanceData.TaskId> warmupTasks) { + return verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, standbyTasks, warmupTasks, true); + } + + private CompletableFuture<Void> verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final Set<StreamsRebalanceData.TaskId> activeTasks, + final Set<StreamsRebalanceData.TaskId> standbyTasks, + final Set<StreamsRebalanceData.TaskId> warmupTasks, + final boolean isGroupReady) { verify(backgroundEventHandler, times(++onTasksAssignedCallbackNeededAddCount)).add(onTasksAssignedCallbackNeededEventCaptor.capture()); final StreamsOnTasksAssignedCallbackNeededEvent onTasksAssignedCallbackNeeded = onTasksAssignedCallbackNeededEventCaptor.getValue(); - assertEquals(makeTaskAssignment(activeTasks, standbyTasks, warmupTasks), onTasksAssignedCallbackNeeded.assignment()); + assertEquals(makeTaskAssignment(activeTasks, standbyTasks, warmupTasks, isGroupReady), onTasksAssignedCallbackNeeded.assignment()); return onTasksAssignedCallbackNeeded.future(); } @@ -2072,7 +2316,7 @@ public class StreamsMembershipManagerTest { private void setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(final String subtopologyId, final String topicName) { - when(streamsRebalanceData.subtopologies()).thenReturn( + lenient().when(streamsRebalanceData.subtopologies()).thenReturn( mkMap( mkEntry( subtopologyId, @@ -2092,7 +2336,7 @@ public class StreamsMembershipManagerTest { final String topicName1, final String subtopologyId2, final String topicName2) { - when(streamsRebalanceData.subtopologies()).thenReturn( + lenient().when(streamsRebalanceData.subtopologies()).thenReturn( mkMap( mkEntry( subtopologyId1, @@ -2199,6 +2443,14 @@ public class StreamsMembershipManagerTest { final List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks, final List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks, final int memberEpoch) { + return makeHeartbeatResponse(activeTasks, standbyTasks, warmupTasks, memberEpoch, null); + } + + private StreamsGroupHeartbeatResponse makeHeartbeatResponse(final List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks, + final List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks, + final List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks, + final int memberEpoch, + final List<StreamsGroupHeartbeatResponseData.Status> statuses) { final StreamsGroupHeartbeatResponseData responseData = new StreamsGroupHeartbeatResponseData() .setErrorCode(Errors.NONE.code()) .setMemberId(membershipManager.memberId()) @@ -2206,16 +2458,27 @@ public class StreamsMembershipManagerTest { .setActiveTasks(activeTasks) .setStandbyTasks(standbyTasks) .setWarmupTasks(warmupTasks); + if (statuses != null) { + responseData.setStatus(statuses); + } return new StreamsGroupHeartbeatResponse(responseData); } private StreamsRebalanceData.Assignment makeTaskAssignment(final Set<StreamsRebalanceData.TaskId> activeTasks, final Set<StreamsRebalanceData.TaskId> standbyTasks, final Set<StreamsRebalanceData.TaskId> warmupTasks) { + return makeTaskAssignment(activeTasks, standbyTasks, warmupTasks, true); + } + + private StreamsRebalanceData.Assignment makeTaskAssignment(final Set<StreamsRebalanceData.TaskId> activeTasks, + final Set<StreamsRebalanceData.TaskId> standbyTasks, + final Set<StreamsRebalanceData.TaskId> warmupTasks, + final boolean isGroupReady) { return new StreamsRebalanceData.Assignment( activeTasks, standbyTasks, - warmupTasks + warmupTasks, + isGroupReady ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java index f2376640c01..237dd4f0993 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java @@ -92,7 +92,8 @@ public class StreamsRebalanceDataTest { final StreamsRebalanceData.Assignment assignment = new StreamsRebalanceData.Assignment( Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)), Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)), - Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)) + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)), + true ); assertThrows( @@ -111,11 +112,11 @@ public class StreamsRebalanceDataTest { @Test public void assignmentShouldNotAcceptNulls() { - final Exception exception1 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(null, Set.of(), Set.of())); + final Exception exception1 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(null, Set.of(), Set.of(), true)); assertEquals("Active tasks cannot be null", exception1.getMessage()); - final Exception exception2 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(Set.of(), null, Set.of())); + final Exception exception2 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(Set.of(), null, Set.of(), true)); assertEquals("Standby tasks cannot be null", exception2.getMessage()); - final Exception exception3 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(Set.of(), Set.of(), null)); + final Exception exception3 = assertThrows(NullPointerException.class, () -> new StreamsRebalanceData.Assignment(Set.of(), Set.of(), null, true)); assertEquals("Warmup tasks cannot be null", exception3.getMessage()); } @@ -125,43 +126,56 @@ public class StreamsRebalanceDataTest { final StreamsRebalanceData.Assignment assignment = new StreamsRebalanceData.Assignment( Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)), Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)), - Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)) + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)), + true ); final StreamsRebalanceData.Assignment assignmentEqual = new StreamsRebalanceData.Assignment( assignment.activeTasks(), assignment.standbyTasks(), - assignment.warmupTasks() + assignment.warmupTasks(), + assignment.isGroupReady() ); Set<StreamsRebalanceData.TaskId> unequalActiveTasks = new HashSet<>(assignment.activeTasks()); unequalActiveTasks.add(additionalTask); final StreamsRebalanceData.Assignment assignmentUnequalActiveTasks = new StreamsRebalanceData.Assignment( unequalActiveTasks, assignment.standbyTasks(), - assignment.warmupTasks() + assignment.warmupTasks(), + assignment.isGroupReady() ); Set<StreamsRebalanceData.TaskId> unequalStandbyTasks = new HashSet<>(assignment.standbyTasks()); unequalStandbyTasks.add(additionalTask); final StreamsRebalanceData.Assignment assignmentUnequalStandbyTasks = new StreamsRebalanceData.Assignment( assignment.activeTasks(), unequalStandbyTasks, - assignment.warmupTasks() + assignment.warmupTasks(), + assignment.isGroupReady() ); Set<StreamsRebalanceData.TaskId> unequalWarmupTasks = new HashSet<>(assignment.warmupTasks()); unequalWarmupTasks.add(additionalTask); final StreamsRebalanceData.Assignment assignmentUnequalWarmupTasks = new StreamsRebalanceData.Assignment( assignment.activeTasks(), assignment.standbyTasks(), - unequalWarmupTasks + unequalWarmupTasks, + assignment.isGroupReady() + ); + final StreamsRebalanceData.Assignment assignmentUnequalIsGroupReady = new StreamsRebalanceData.Assignment( + assignment.activeTasks(), + assignment.standbyTasks(), + assignment.warmupTasks(), + !assignment.isGroupReady() ); assertEquals(assignment, assignmentEqual); assertNotEquals(assignment, assignmentUnequalActiveTasks); assertNotEquals(assignment, assignmentUnequalStandbyTasks); assertNotEquals(assignment, assignmentUnequalWarmupTasks); + assertNotEquals(assignment, assignmentUnequalIsGroupReady); assertEquals(assignment.hashCode(), assignmentEqual.hashCode()); assertNotEquals(assignment.hashCode(), assignmentUnequalActiveTasks.hashCode()); assertNotEquals(assignment.hashCode(), assignmentUnequalStandbyTasks.hashCode()); assertNotEquals(assignment.hashCode(), assignmentUnequalWarmupTasks.hashCode()); + assertNotEquals(assignment.hashCode(), assignmentUnequalIsGroupReady.hashCode()); } @Test @@ -169,7 +183,8 @@ public class StreamsRebalanceDataTest { final StreamsRebalanceData.Assignment assignment = new StreamsRebalanceData.Assignment( Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)), Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)), - Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)) + Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)), + true ); final StreamsRebalanceData.Assignment copy = assignment.copy(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java index 749a4594ab8..b7c7275df58 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java @@ -60,7 +60,7 @@ public class StreamsRebalanceListenerInvokerTest { @Test public void testSetRebalanceListenerWithNull() { - NullPointerException exception = assertThrows(NullPointerException.class, + NullPointerException exception = assertThrows(NullPointerException.class, () -> invoker.setRebalanceListener(null)); assertEquals("StreamsRebalanceListener cannot be null", exception.getMessage()); } @@ -96,12 +96,12 @@ public class StreamsRebalanceListenerInvokerTest { @Test public void testInvokeAllTasksRevokedWithListener() { invoker.setRebalanceListener(mockListener); - + StreamsRebalanceData.Assignment mockAssignment = createMockAssignment(); when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment); Exception result = invoker.invokeAllTasksRevoked(); - + assertNull(result); verify(mockListener).onTasksRevoked(eq(mockAssignment.activeTasks())); } @@ -112,7 +112,7 @@ public class StreamsRebalanceListenerInvokerTest { StreamsRebalanceData.Assignment assignment = createMockAssignment(); Exception result = invoker.invokeTasksAssigned(assignment); - + assertNull(result); verify(mockListener).onTasksAssigned(eq(assignment)); } @@ -123,10 +123,10 @@ public class StreamsRebalanceListenerInvokerTest { StreamsRebalanceData.Assignment assignment = createMockAssignment(); WakeupException wakeupException = new WakeupException(); doThrow(wakeupException).when(mockListener).onTasksAssigned(assignment); - - WakeupException thrownException = assertThrows(WakeupException.class, + + WakeupException thrownException = assertThrows(WakeupException.class, () -> invoker.invokeTasksAssigned(assignment)); - + assertEquals(wakeupException, thrownException); verify(mockListener).onTasksAssigned(eq(assignment)); } @@ -137,10 +137,10 @@ public class StreamsRebalanceListenerInvokerTest { StreamsRebalanceData.Assignment assignment = createMockAssignment(); InterruptException interruptException = new InterruptException("Test interrupt"); doThrow(interruptException).when(mockListener).onTasksAssigned(assignment); - - InterruptException thrownException = assertThrows(InterruptException.class, + + InterruptException thrownException = assertThrows(InterruptException.class, () -> invoker.invokeTasksAssigned(assignment)); - + assertEquals(interruptException, thrownException); verify(mockListener).onTasksAssigned(eq(assignment)); } @@ -151,9 +151,9 @@ public class StreamsRebalanceListenerInvokerTest { StreamsRebalanceData.Assignment assignment = createMockAssignment(); RuntimeException runtimeException = new RuntimeException("Test exception"); doThrow(runtimeException).when(mockListener).onTasksAssigned(assignment); - + Exception result = invoker.invokeTasksAssigned(assignment); - + assertEquals(runtimeException, result); verify(mockListener).onTasksAssigned(eq(assignment)); } @@ -164,7 +164,7 @@ public class StreamsRebalanceListenerInvokerTest { Set<StreamsRebalanceData.TaskId> tasks = createMockTasks(); Exception result = invoker.invokeTasksRevoked(tasks); - + assertNull(result); verify(mockListener).onTasksRevoked(eq(tasks)); } @@ -175,10 +175,10 @@ public class StreamsRebalanceListenerInvokerTest { Set<StreamsRebalanceData.TaskId> tasks = createMockTasks(); WakeupException wakeupException = new WakeupException(); doThrow(wakeupException).when(mockListener).onTasksRevoked(tasks); - - WakeupException thrownException = assertThrows(WakeupException.class, + + WakeupException thrownException = assertThrows(WakeupException.class, () -> invoker.invokeTasksRevoked(tasks)); - + assertEquals(wakeupException, thrownException); verify(mockListener).onTasksRevoked(eq(tasks)); } @@ -189,10 +189,10 @@ public class StreamsRebalanceListenerInvokerTest { Set<StreamsRebalanceData.TaskId> tasks = createMockTasks(); InterruptException interruptException = new InterruptException("Test interrupt"); doThrow(interruptException).when(mockListener).onTasksRevoked(tasks); - - InterruptException thrownException = assertThrows(InterruptException.class, + + InterruptException thrownException = assertThrows(InterruptException.class, () -> invoker.invokeTasksRevoked(tasks)); - + assertEquals(interruptException, thrownException); verify(mockListener).onTasksRevoked(eq(tasks)); } @@ -203,9 +203,9 @@ public class StreamsRebalanceListenerInvokerTest { Set<StreamsRebalanceData.TaskId> tasks = createMockTasks(); RuntimeException runtimeException = new RuntimeException("Test exception"); doThrow(runtimeException).when(mockListener).onTasksRevoked(tasks); - + Exception result = invoker.invokeTasksRevoked(tasks); - + assertEquals(runtimeException, result); verify(mockListener).onTasksRevoked(eq(tasks)); } @@ -215,7 +215,7 @@ public class StreamsRebalanceListenerInvokerTest { invoker.setRebalanceListener(mockListener); Exception result = invoker.invokeAllTasksLost(); - + assertNull(result); verify(mockListener).onAllTasksLost(); } @@ -225,10 +225,10 @@ public class StreamsRebalanceListenerInvokerTest { invoker.setRebalanceListener(mockListener); WakeupException wakeupException = new WakeupException(); doThrow(wakeupException).when(mockListener).onAllTasksLost(); - - WakeupException thrownException = assertThrows(WakeupException.class, + + WakeupException thrownException = assertThrows(WakeupException.class, () -> invoker.invokeAllTasksLost()); - + assertEquals(wakeupException, thrownException); verify(mockListener).onAllTasksLost(); } @@ -238,10 +238,10 @@ public class StreamsRebalanceListenerInvokerTest { invoker.setRebalanceListener(mockListener); InterruptException interruptException = new InterruptException("Test interrupt"); doThrow(interruptException).when(mockListener).onAllTasksLost(); - - InterruptException thrownException = assertThrows(InterruptException.class, + + InterruptException thrownException = assertThrows(InterruptException.class, () -> invoker.invokeAllTasksLost()); - + assertEquals(interruptException, thrownException); verify(mockListener).onAllTasksLost(); } @@ -251,9 +251,9 @@ public class StreamsRebalanceListenerInvokerTest { invoker.setRebalanceListener(mockListener); RuntimeException runtimeException = new RuntimeException("Test exception"); doThrow(runtimeException).when(mockListener).onAllTasksLost(); - + Exception result = invoker.invokeAllTasksLost(); - + assertEquals(runtimeException, result); verify(mockListener).onAllTasksLost(); } @@ -262,8 +262,8 @@ public class StreamsRebalanceListenerInvokerTest { Set<StreamsRebalanceData.TaskId> activeTasks = createMockTasks(); Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of(); Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of(); - - return new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, warmupTasks); + + return new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, warmupTasks, true); } private Set<StreamsRebalanceData.TaskId> createMockTasks() { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 347ba953ca2..0c6310b5e1d 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -3845,6 +3845,9 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val response = sendRequestAndVerifyResponseError(request, resource, isAuthorized = true).asInstanceOf[StreamsGroupHeartbeatResponse] assertEquals( util.List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code()) + .setStatusDetail("Assignment delayed due to the configured initial rebalance delay."), + new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) .setStatusDetail("Internal topics are missing: [topic]; Unauthorized to CREATE on topics topic.")), response.data().status()) @@ -3876,6 +3879,9 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { // Request successful, and no internal topic creation error. assertEquals( util.List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code()) + .setStatusDetail("Assignment delayed due to the configured initial rebalance delay."), + new StreamsGroupHeartbeatResponseData.Status() .setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()) .setStatusDetail("Internal topics are missing: [topic]")), response.data().status()) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 6e6eadecd1e..d5f22bf33ba 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2069,6 +2069,12 @@ public class GroupMetadataManager { // During initial rebalance delay, return empty assignment to first joining members. targetAssignmentEpoch = 1; targetAssignment = TasksTuple.EMPTY; + + returnedStatus.add( + new Status() + .setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code()) + .setStatusDetail("Assignment delayed due to the configured initial rebalance delay.") + ); } else { targetAssignment = updateStreamsTargetAssignment( group, @@ -2156,6 +2162,7 @@ public class GroupMetadataManager { )); response.setStatus(returnedStatus); + return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated)); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 3f24ba06f4e..761f899f589 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -17582,7 +17582,11 @@ public class GroupMetadataManagerTest { .setWarmupTasks(List.of()) .setPartitionsByUserEndpoint(null) .setEndpointInformationEpoch(0) - .setStatus(List.of()), + .setStatus(List.of( + new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.ASSIGNMENT_DELAYED.code()) + .setStatusDetail("Assignment delayed due to the configured initial rebalance delay.") + )), result.response().data() ); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index 121b6d5217a..df993ac81ad 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -247,6 +247,11 @@ public class KafkaStreamsTelemetryIntegrationTest { return "org.apache.kafka." + group + "." + name; }).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics .sorted().toList(); + TestUtils.waitForCondition( + () -> TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).size() == expectedMetrics.size(), + 30_000, + "Never received enough metrics" + ); final List<String> actualMetrics = new ArrayList<>(TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(mainConsumerInstanceId)); assertEquals(expectedMetrics, actualMetrics); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index a56723abc33..b31cee581b0 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; @@ -48,15 +49,17 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Properties; import java.util.stream.Collectors; @@ -339,8 +342,13 @@ public class MetricsIntegrationTest { () -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms"); } - @Test - public void shouldAddMetricsOnAllLevels() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldAddMetricsOnAllLevels(final boolean streamsProtocolEnabled) throws Exception { + if (streamsProtocolEnabled) { + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + } + builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String())); builder.table(STREAM_OUTPUT_1, @@ -373,8 +381,13 @@ public class MetricsIntegrationTest { checkMetricsDeregistration(); } - @Test - public void shouldAddMetricsForWindowStoreAndSuppressionBuffer() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final boolean streamsProtocolEnabled) throws Exception { + if (streamsProtocolEnabled) { + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + } + final Duration windowSize = Duration.ofMillis(50); builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) .groupByKey() @@ -401,8 +414,13 @@ public class MetricsIntegrationTest { checkMetricsDeregistration(); } - @Test - public void shouldAddMetricsForSessionStore() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldAddMetricsForSessionStore(final boolean streamsProtocolEnabled) throws Exception { + if (streamsProtocolEnabled) { + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + } + final Duration inactivityGap = Duration.ofMillis(50); builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) .groupByKey() diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index 54aa5dd5f0d..d77355831bc 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -42,15 +43,17 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -142,9 +145,13 @@ public class RocksDBMetricsIntegrationTest { void verify(final KafkaStreams kafkaStreams, final String metricScope) throws Exception; } - @Test - public void shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final TestInfo testInfo) throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean streamsProtocolEnabled, final TestInfo testInfo) throws Exception { final Properties streamsConfiguration = streamsConfig(testInfo); + if (streamsProtocolEnabled) { + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); final StreamsBuilder builder = builderForStateStores(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java index 41accecb1b1..1cdfcba6810 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java @@ -96,6 +96,7 @@ public class DefaultStreamsRebalanceListener implements StreamsRebalanceListener log.info("Processing new assignment {} from Streams Rebalance Protocol", assignment); try { + streamThread.setStreamsGroupReady(assignment.isGroupReady()); taskManager.handleAssignment(activeTasksWithPartitions, standbyTasksWithPartitions); streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED); taskManager.handleRebalanceComplete(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f208567c32d..41d8dbcc0ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -339,6 +339,7 @@ public class StreamThread extends Thread implements ProcessingThread { private long lastPurgeMs; private long lastPartitionAssignedMs = -1L; private int numIterations; + private boolean streamsGroupReady = false; private volatile State state = State.CREATED; private volatile ThreadMetadata threadMetadata; private StreamThread.StateListener stateListener; @@ -1209,7 +1210,15 @@ public class StreamThread extends Thread implements ProcessingThread { pollLatency = pollPhase(); totalPolledSinceLastSummary += 1; - handleStreamsRebalanceData(); + if (streamsRebalanceData.isPresent()) { + // Always handle status codes (e.g., MISSING_SOURCE_TOPICS, INCORRECTLY_PARTITIONED_TOPICS) + // regardless of streamsGroupReady, as these may throw exceptions that need to be handled. + handleStreamsRebalanceData(); + + if (!streamsGroupReady) { + return; + } + } // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). @@ -1360,7 +1369,15 @@ public class StreamThread extends Thread implements ProcessingThread { taskManager.resumePollingForPartitionsWithAvailableSpace(); pollLatency = pollPhase(); - handleStreamsRebalanceData(); + if (streamsRebalanceData.isPresent()) { + // Always handle status codes (e.g., MISSING_SOURCE_TOPICS, INCORRECTLY_PARTITIONED_TOPICS) + // regardless of streamsGroupReady, as these may throw exceptions that need to be handled. + handleStreamsRebalanceData(); + + if (!streamsGroupReady) { + return; + } + } // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). @@ -1544,6 +1561,17 @@ public class StreamThread extends Thread implements ProcessingThread { return records; } + /** + * Sets the readiness state of the Streams group for this thread. + * + * @param ready {@code true} if the Streams group is ready to process records; {@code false} otherwise. + * When set to {@code true}, this thread may transition to an active processing state. + * When set to {@code false}, the thread will not process records until the group is ready. + */ + public void setStreamsGroupReady(final boolean ready) { + streamsGroupReady = ready; + } + public void handleStreamsRebalanceData() { if (streamsRebalanceData.isPresent()) { boolean hasMissingSourceTopics = false; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java index 0e9e013f628..81103b3a2ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java @@ -173,12 +173,14 @@ public class DefaultStreamsRebalanceListenerTest { final StreamsRebalanceData.Assignment assignment = new StreamsRebalanceData.Assignment( Set.of(new StreamsRebalanceData.TaskId("1", 0)), Set.of(new StreamsRebalanceData.TaskId("2", 0)), - Set.of(new StreamsRebalanceData.TaskId("3", 0)) + Set.of(new StreamsRebalanceData.TaskId("3", 0)), + false ); assertDoesNotThrow(() -> defaultStreamsRebalanceListener.onTasksAssigned(assignment)); final InOrder inOrder = inOrder(taskManager, streamThread, streamsRebalanceData); + inOrder.verify(streamThread).setStreamsGroupReady(false); inOrder.verify(taskManager).handleAssignment( Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0))), Map.of( @@ -201,10 +203,11 @@ public class DefaultStreamsRebalanceListenerTest { createRebalanceListenerWithRebalanceData(streamsRebalanceData); final Exception actualException = assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onTasksAssigned( - new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) + new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), false) )); assertEquals(exception, actualException); + verify(streamThread).setStreamsGroupReady(false); verify(taskManager).handleAssignment(any(), any()); verify(streamThread, never()).setState(StreamThread.State.PARTITIONS_ASSIGNED); verify(taskManager, never()).handleRebalanceComplete(); @@ -302,11 +305,13 @@ public class DefaultStreamsRebalanceListenerTest { new StreamsRebalanceData.Assignment( Set.of(new StreamsRebalanceData.TaskId("1", 0)), Set.of(), - Set.of() + Set.of(), + true ) ); verify(tasksAssignedSensor).record(150L); + verify(streamThread).setStreamsGroupReady(true); verify(taskManager).handleAssignment( Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0))), Map.of() @@ -376,10 +381,11 @@ public class DefaultStreamsRebalanceListenerTest { createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onTasksAssigned( - new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of()) + new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), false) )); verify(tasksAssignedSensor).record(75L); + verify(streamThread).setStreamsGroupReady(false); verify(taskManager).handleAssignment(any(), any()); }
