This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c7f71697728 KAFKA-19978: Don't enter running state as long as group is
not ready (#21110)
c7f71697728 is described below
commit c7f7169772873cc36cc344af9b0d96034ff9c774
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Dec 11 01:22:18 2025 +0100
KAFKA-19978: Don't enter running state as long as group is not ready
(#21110)
Since internal topics are created asynchronous, and we do not want to
have heartbeats failing, when an application has not yet created its
internal topics, it remains in NOT_READY, until the internal topics are
created.
The way the client-side is implemented right now, the Kafka Streams
application will still enter RUNNING state with an empty assignment,
when internal topics are missing. From the outside, it will therefore
look like the initial set-up of the application has finished, while the
application is still waiting for the group to become ready.
This propagates the state of the group as part of the assignment, so
that we delay entering a "RUNNING" state until the group has become
ready.
Reviewers: Matthias J. Sax <[email protected]>
---
.../internals/StreamsMembershipManager.java | 84 +++++--
.../consumer/internals/StreamsRebalanceData.java | 19 +-
.../requests/StreamsGroupHeartbeatResponse.java | 22 +-
.../message/StreamsGroupHeartbeatResponse.json | 1 +
.../StreamsGroupHeartbeatRequestManagerTest.java | 9 +-
.../internals/StreamsMembershipManagerTest.java | 271 ++++++++++++++++++++-
.../internals/StreamsRebalanceDataTest.java | 35 ++-
.../StreamsRebalanceListenerInvokerTest.java | 64 ++---
.../kafka/api/AuthorizerIntegrationTest.scala | 6 +
.../coordinator/group/GroupMetadataManager.java | 7 +
.../group/GroupMetadataManagerTest.java | 6 +-
.../KafkaStreamsTelemetryIntegrationTest.java | 5 +
.../integration/MetricsIntegrationTest.java | 32 ++-
.../integration/RocksDBMetricsIntegrationTest.java | 13 +-
.../internals/DefaultStreamsRebalanceListener.java | 1 +
.../streams/processor/internals/StreamThread.java | 32 ++-
.../DefaultStreamsRebalanceListenerTest.java | 14 +-
17 files changed, 527 insertions(+), 94 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
index 84ac83125be..fcb0b8ad6c6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java
@@ -80,22 +80,26 @@ public class StreamsMembershipManager implements
RequestManager {
NONE_EPOCH,
Collections.emptyMap(),
Collections.emptyMap(),
- Collections.emptyMap()
+ Collections.emptyMap(),
+ false
);
public final long localEpoch;
public final Map<String, SortedSet<Integer>> activeTasks;
public final Map<String, SortedSet<Integer>> standbyTasks;
public final Map<String, SortedSet<Integer>> warmupTasks;
+ public final boolean isGroupReady;
public LocalAssignment(final long localEpoch,
final Map<String, SortedSet<Integer>>
activeTasks,
final Map<String, SortedSet<Integer>>
standbyTasks,
- final Map<String, SortedSet<Integer>>
warmupTasks) {
+ final Map<String, SortedSet<Integer>>
warmupTasks,
+ final boolean isGroupReady) {
this.localEpoch = localEpoch;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
this.warmupTasks = warmupTasks;
+ this.isGroupReady = isGroupReady;
if (localEpoch == NONE_EPOCH &&
(!activeTasks.isEmpty() || !standbyTasks.isEmpty() ||
!warmupTasks.isEmpty())) {
throw new IllegalArgumentException("Local epoch must be set if
tasks are assigned.");
@@ -104,16 +108,19 @@ public class StreamsMembershipManager implements
RequestManager {
Optional<LocalAssignment> updateWith(final Map<String,
SortedSet<Integer>> activeTasks,
final Map<String,
SortedSet<Integer>> standbyTasks,
- final Map<String,
SortedSet<Integer>> warmupTasks) {
+ final Map<String,
SortedSet<Integer>> warmupTasks,
+ final boolean isGroupReady) {
if (localEpoch != NONE_EPOCH &&
activeTasks.equals(this.activeTasks) &&
standbyTasks.equals(this.standbyTasks) &&
- warmupTasks.equals(this.warmupTasks)) {
+ warmupTasks.equals(this.warmupTasks) &&
+ isGroupReady == this.isGroupReady
+ ) {
return Optional.empty();
}
long nextLocalEpoch = localEpoch + 1;
- return Optional.of(new LocalAssignment(nextLocalEpoch,
activeTasks, standbyTasks, warmupTasks));
+ return Optional.of(new LocalAssignment(nextLocalEpoch,
activeTasks, standbyTasks, warmupTasks, isGroupReady));
}
@Override
@@ -123,6 +130,7 @@ public class StreamsMembershipManager implements
RequestManager {
", activeTasks=" + activeTasks +
", standbyTasks=" + standbyTasks +
", warmupTasks=" + warmupTasks +
+ ", isGroupReady=" + isGroupReady +
'}';
}
@@ -134,12 +142,13 @@ public class StreamsMembershipManager implements
RequestManager {
return localEpoch == that.localEpoch &&
Objects.equals(activeTasks, that.activeTasks) &&
Objects.equals(standbyTasks, that.standbyTasks) &&
- Objects.equals(warmupTasks, that.warmupTasks);
+ Objects.equals(warmupTasks, that.warmupTasks) &&
+ isGroupReady == that.isGroupReady;
}
@Override
public int hashCode() {
- return Objects.hash(localEpoch, activeTasks, standbyTasks,
warmupTasks);
+ return Objects.hash(localEpoch, activeTasks, standbyTasks,
warmupTasks, isGroupReady);
}
}
@@ -686,9 +695,9 @@ public class StreamsMembershipManager implements
RequestManager {
final List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks =
responseData.activeTasks();
final List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks =
responseData.standbyTasks();
final List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks =
responseData.warmupTasks();
+ final boolean isGroupReady = isGroupReady(responseData.status());
if (activeTasks != null && standbyTasks != null && warmupTasks !=
null) {
-
if (!state.canHandleNewAssignment()) {
log.debug("Ignoring new assignment: active tasks {}, standby
tasks {}, and warm-up tasks {} received " +
"from server because member is in {} state.",
@@ -699,17 +708,39 @@ public class StreamsMembershipManager implements
RequestManager {
processAssignmentReceived(
toTasksAssignment(activeTasks),
toTasksAssignment(standbyTasks),
- toTasksAssignment(warmupTasks)
+ toTasksAssignment(warmupTasks),
+ isGroupReady
);
- } else {
- if (responseData.activeTasks() != null ||
- responseData.standbyTasks() != null ||
- responseData.warmupTasks() != null) {
+ } else if (responseData.activeTasks() != null ||
responseData.standbyTasks() != null || responseData.warmupTasks() != null) {
+ throw new IllegalStateException("Invalid response data, task
collections must be all null or all non-null: "
+ + responseData);
+ } else if (isGroupReady != targetAssignment.isGroupReady) {
+ // If the client did not provide a new assignment, but the group
is now ready or not ready anymore, so
+ // update the target assignment and reconcile it.
+ processAssignmentReceived(
+ targetAssignment.activeTasks,
+ targetAssignment.standbyTasks,
+ targetAssignment.warmupTasks,
+ isGroupReady
+ );
+ }
+ }
- throw new IllegalStateException("Invalid response data, task
collections must be all null or all non-null: "
- + responseData);
+ private boolean
isGroupReady(List<StreamsGroupHeartbeatResponseData.Status> statuses) {
+ if (statuses != null) {
+ for (final StreamsGroupHeartbeatResponseData.Status status :
statuses) {
+ switch
(StreamsGroupHeartbeatResponse.Status.fromCode(status.statusCode())) {
+ case MISSING_SOURCE_TOPICS:
+ case MISSING_INTERNAL_TOPICS:
+ case INCORRECTLY_PARTITIONED_TOPICS:
+ case ASSIGNMENT_DELAYED:
+ return false;
+ default:
+ // continue checking other statuses
+ }
}
}
+ return true;
}
/**
@@ -952,11 +983,13 @@ public class StreamsMembershipManager implements
RequestManager {
* @param activeTasks Target active tasks assignment received from the
broker.
* @param standbyTasks Target standby tasks assignment received from the
broker.
* @param warmupTasks Target warm-up tasks assignment received from the
broker.
+ * @param isGroupReady True if the group is ready, false otherwise.
*/
private void processAssignmentReceived(Map<String, SortedSet<Integer>>
activeTasks,
Map<String, SortedSet<Integer>>
standbyTasks,
- Map<String, SortedSet<Integer>>
warmupTasks) {
- replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks,
warmupTasks);
+ Map<String, SortedSet<Integer>>
warmupTasks,
+ boolean isGroupReady) {
+ replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks,
warmupTasks, isGroupReady);
if (!targetAssignmentReconciled()) {
transitionTo(MemberState.RECONCILING);
} else {
@@ -975,8 +1008,9 @@ public class StreamsMembershipManager implements
RequestManager {
private void replaceTargetAssignmentWithNewAssignment(Map<String,
SortedSet<Integer>> activeTasks,
Map<String,
SortedSet<Integer>> standbyTasks,
- Map<String,
SortedSet<Integer>> warmupTasks) {
- targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks)
+ Map<String,
SortedSet<Integer>> warmupTasks,
+ boolean
isGroupReady) {
+ targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks,
isGroupReady)
.ifPresent(updatedAssignment -> {
log.debug("Target assignment updated from {} to {}. Member
will reconcile it on the next poll.",
targetAssignment, updatedAssignment);
@@ -1025,8 +1059,9 @@ public class StreamsMembershipManager implements
RequestManager {
SortedSet<StreamsRebalanceData.TaskId> ownedStandbyTasks =
toTaskIdSet(currentAssignment.standbyTasks);
SortedSet<StreamsRebalanceData.TaskId> assignedWarmupTasks =
toTaskIdSet(targetAssignment.warmupTasks);
SortedSet<StreamsRebalanceData.TaskId> ownedWarmupTasks =
toTaskIdSet(currentAssignment.warmupTasks);
+ boolean isGroupReady = targetAssignment.isGroupReady;
- log.info("Assigned tasks with local epoch {}\n" +
+ log.info("Assigned tasks with local epoch {} and group {}\n" +
"\tMember: {}\n" +
"\tAssigned active tasks: {}\n" +
"\tOwned active tasks: {}\n" +
@@ -1036,6 +1071,7 @@ public class StreamsMembershipManager implements
RequestManager {
"\tAssigned warm-up tasks: {}\n" +
"\tOwned warm-up tasks: {}\n",
targetAssignment.localEpoch,
+ isGroupReady ? "is ready" : "is not ready",
memberId,
assignedActiveTasks,
ownedActiveTasks,
@@ -1064,7 +1100,7 @@ public class StreamsMembershipManager implements
RequestManager {
final CompletableFuture<Void> tasksRevokedAndAssigned =
tasksRevoked.thenCompose(__ -> {
if (!maybeAbortReconciliation()) {
- return assignTasks(assignedActiveTasks, ownedActiveTasks,
assignedStandbyTasks, assignedWarmupTasks);
+ return assignTasks(assignedActiveTasks, ownedActiveTasks,
assignedStandbyTasks, assignedWarmupTasks, isGroupReady);
}
return CompletableFuture.completedFuture(null);
});
@@ -1117,7 +1153,8 @@ public class StreamsMembershipManager implements
RequestManager {
private CompletableFuture<Void> assignTasks(final
SortedSet<StreamsRebalanceData.TaskId> activeTasksToAssign,
final
SortedSet<StreamsRebalanceData.TaskId> ownedActiveTasks,
final
SortedSet<StreamsRebalanceData.TaskId> standbyTasksToAssign,
- final
SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign) {
+ final
SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign,
+ final boolean isGroupReady) {
log.info("Assigning active tasks {{}}, standby tasks {{}}, and warm-up
tasks {{}} to the member.",
activeTasksToAssign.stream()
.map(StreamsRebalanceData.TaskId::toString)
@@ -1145,7 +1182,8 @@ public class StreamsMembershipManager implements
RequestManager {
new StreamsRebalanceData.Assignment(
activeTasksToAssign,
standbyTasksToAssign,
- warmupTasksToAssign
+ warmupTasksToAssign,
+ isGroupReady
)
);
onTasksAssignedCallbackExecuted.whenComplete((__, callbackError) -> {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index c6fe1fd9215..ac3c53c47d9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -157,18 +157,23 @@ public class StreamsRebalanceData {
private final Set<TaskId> warmupTasks;
+ private final boolean isGroupReady;
+
private Assignment() {
this.activeTasks = Set.of();
this.standbyTasks = Set.of();
this.warmupTasks = Set.of();
+ this.isGroupReady = false;
}
public Assignment(final Set<TaskId> activeTasks,
final Set<TaskId> standbyTasks,
- final Set<TaskId> warmupTasks) {
+ final Set<TaskId> warmupTasks,
+ final boolean isGroupReady) {
this.activeTasks = Set.copyOf(Objects.requireNonNull(activeTasks,
"Active tasks cannot be null"));
this.standbyTasks =
Set.copyOf(Objects.requireNonNull(standbyTasks, "Standby tasks cannot be
null"));
this.warmupTasks = Set.copyOf(Objects.requireNonNull(warmupTasks,
"Warmup tasks cannot be null"));
+ this.isGroupReady = isGroupReady;
}
public Set<TaskId> activeTasks() {
@@ -183,6 +188,10 @@ public class StreamsRebalanceData {
return warmupTasks;
}
+ public boolean isGroupReady() {
+ return isGroupReady;
+ }
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -194,16 +203,17 @@ public class StreamsRebalanceData {
final Assignment that = (Assignment) o;
return Objects.equals(activeTasks, that.activeTasks)
&& Objects.equals(standbyTasks, that.standbyTasks)
- && Objects.equals(warmupTasks, that.warmupTasks);
+ && Objects.equals(warmupTasks, that.warmupTasks)
+ && isGroupReady == that.isGroupReady;
}
@Override
public int hashCode() {
- return Objects.hash(activeTasks, standbyTasks, warmupTasks);
+ return Objects.hash(activeTasks, standbyTasks, warmupTasks,
isGroupReady);
}
public Assignment copy() {
- return new Assignment(activeTasks, standbyTasks, warmupTasks);
+ return new Assignment(activeTasks, standbyTasks, warmupTasks,
isGroupReady);
}
@Override
@@ -212,6 +222,7 @@ public class StreamsRebalanceData {
"activeTasks=" + activeTasks +
", standbyTasks=" + standbyTasks +
", warmupTasks=" + warmupTasks +
+ ", isGroupReady=" + isGroupReady +
'}';
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
index 32fe55f12cd..87c10a98d37 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -81,7 +82,18 @@ public class StreamsGroupHeartbeatResponse extends
AbstractResponse {
MISSING_SOURCE_TOPICS((byte) 1, "One or more source topics are missing
or a source topic regex resolves to zero topics."),
INCORRECTLY_PARTITIONED_TOPICS((byte) 2, "One or more topics expected
to be copartitioned are not copartitioned."),
MISSING_INTERNAL_TOPICS((byte) 3, "One or more internal topics are
missing."),
- SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the
whole application.");
+ SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the
whole application."),
+ ASSIGNMENT_DELAYED((byte) 5, "The assignment was delayed by the
coordinator.");
+
+ private static final Map<Byte, Status> CODE_TO_STATUS;
+
+ static {
+ Map<Byte, Status> map = new HashMap<>();
+ for (Status status : values()) {
+ map.put(status.code, status);
+ }
+ CODE_TO_STATUS = Collections.unmodifiableMap(map);
+ }
private final byte code;
private final String message;
@@ -98,5 +110,13 @@ public class StreamsGroupHeartbeatResponse extends
AbstractResponse {
public String message() {
return message;
}
+
+ public static Status fromCode(byte code) {
+ Status status = CODE_TO_STATUS.get(code);
+ if (status == null) {
+ throw new IllegalArgumentException("Unknown code " + code);
+ }
+ return status;
+ }
}
}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index 220c9704a23..27cf47bb1a4 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -94,6 +94,7 @@
// The group coordinator will
attempt to create all missing internal topics, if any errors occur during
// topic creation, this will be
indicated in StatusDetail.
// 4 - SHUTDOWN_APPLICATION - A client requested the shutdown
of the whole application.
+ // 5 - ASSIGNMENT_DELAYED - No assignment was provided
because assignment computation was delayed.
{ "name": "StatusCode", "type": "int8", "versions": "0+",
"about": "A code to indicate that a particular status is active for
the group membership" },
{ "name": "StatusDetail", "type": "string", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 9e4b8437144..35392e99398 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -821,7 +821,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
- )
+ ),
+ true
)
);
@@ -870,7 +871,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
),
Set.of(
- )
+ ),
+ true
)
);
@@ -923,7 +925,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
- )
+ ),
+ true
)
);
StreamsGroupHeartbeatRequestData requestDataBeforeReset =
heartbeatState.buildRequestData();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index c0255a4d29a..50e582f5cc4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -72,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -1937,6 +1938,242 @@ public class StreamsMembershipManagerTest {
verifyInStateUnsubscribed(membershipManager);
}
+ @Test
+ public void testIsGroupReadyWithMissingSourceTopicsStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+ .setStatusDetail("One or more source topics are missing.")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithMissingInternalTopicsStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+ .setStatusDetail("One or more internal topics are missing.")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithIncorrectlyPartitionedTopicsStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code())
+ .setStatusDetail("One or more topics expected to be
copartitioned are not copartitioned.")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithAssignmentDelayedStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("Assignment delayed due to the configured
initial rebalance delay.")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithNoStatuses() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ null
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ true
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithOtherStatuses() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code())
+ .setStatusDetail("The topology epoch supplied is inconsistent
with the topology for this streams group.")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ true
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyChangeWhenTasksAreNull() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final StreamsGroupHeartbeatResponse responseWithTasks =
makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ null
+ );
+
+ membershipManager.onHeartbeatSuccess(responseWithTasks);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future1 =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ true
+ );
+ future1.complete(null);
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("Assignment delayed due to the configured
initial rebalance delay.")
+ );
+
+ final StreamsGroupHeartbeatResponse responseWithoutTasks =
makeHeartbeatResponse(
+ null,
+ null,
+ null,
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(responseWithoutTasks);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future2 =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+ future2.complete(null);
+ }
+
private void verifyThatNoTasksHaveBeenRevoked() {
verify(backgroundEventHandler,
never()).add(any(StreamsOnTasksRevokedCallbackNeededEvent.class));
verify(subscriptionState, never()).markPendingRevocation(any());
@@ -2036,9 +2273,16 @@ public class StreamsMembershipManagerTest {
private CompletableFuture<Void>
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final
Set<StreamsRebalanceData.TaskId> activeTasks,
final Set<StreamsRebalanceData.TaskId> standbyTasks,
final Set<StreamsRebalanceData.TaskId> warmupTasks) {
+ return
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks,
standbyTasks, warmupTasks, true);
+ }
+
+ private CompletableFuture<Void>
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(final
Set<StreamsRebalanceData.TaskId> activeTasks,
+
final Set<StreamsRebalanceData.TaskId> standbyTasks,
+
final Set<StreamsRebalanceData.TaskId> warmupTasks,
+
final boolean isGroupReady) {
verify(backgroundEventHandler,
times(++onTasksAssignedCallbackNeededAddCount)).add(onTasksAssignedCallbackNeededEventCaptor.capture());
final StreamsOnTasksAssignedCallbackNeededEvent
onTasksAssignedCallbackNeeded =
onTasksAssignedCallbackNeededEventCaptor.getValue();
- assertEquals(makeTaskAssignment(activeTasks, standbyTasks,
warmupTasks), onTasksAssignedCallbackNeeded.assignment());
+ assertEquals(makeTaskAssignment(activeTasks, standbyTasks,
warmupTasks, isGroupReady), onTasksAssignedCallbackNeeded.assignment());
return onTasksAssignedCallbackNeeded.future();
}
@@ -2072,7 +2316,7 @@ public class StreamsMembershipManagerTest {
private void
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(final String
subtopologyId,
final String topicName) {
- when(streamsRebalanceData.subtopologies()).thenReturn(
+ lenient().when(streamsRebalanceData.subtopologies()).thenReturn(
mkMap(
mkEntry(
subtopologyId,
@@ -2092,7 +2336,7 @@ public class StreamsMembershipManagerTest {
final String
topicName1,
final String
subtopologyId2,
final String
topicName2) {
- when(streamsRebalanceData.subtopologies()).thenReturn(
+ lenient().when(streamsRebalanceData.subtopologies()).thenReturn(
mkMap(
mkEntry(
subtopologyId1,
@@ -2199,6 +2443,14 @@ public class StreamsMembershipManagerTest {
final
List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks,
final
List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks,
final int
memberEpoch) {
+ return makeHeartbeatResponse(activeTasks, standbyTasks, warmupTasks,
memberEpoch, null);
+ }
+
+ private StreamsGroupHeartbeatResponse makeHeartbeatResponse(final
List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks,
+ final
List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks,
+ final
List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks,
+ final int
memberEpoch,
+ final
List<StreamsGroupHeartbeatResponseData.Status> statuses) {
final StreamsGroupHeartbeatResponseData responseData = new
StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(membershipManager.memberId())
@@ -2206,16 +2458,27 @@ public class StreamsMembershipManagerTest {
.setActiveTasks(activeTasks)
.setStandbyTasks(standbyTasks)
.setWarmupTasks(warmupTasks);
+ if (statuses != null) {
+ responseData.setStatus(statuses);
+ }
return new StreamsGroupHeartbeatResponse(responseData);
}
private StreamsRebalanceData.Assignment makeTaskAssignment(final
Set<StreamsRebalanceData.TaskId> activeTasks,
final
Set<StreamsRebalanceData.TaskId> standbyTasks,
final
Set<StreamsRebalanceData.TaskId> warmupTasks) {
+ return makeTaskAssignment(activeTasks, standbyTasks, warmupTasks,
true);
+ }
+
+ private StreamsRebalanceData.Assignment makeTaskAssignment(final
Set<StreamsRebalanceData.TaskId> activeTasks,
+ final
Set<StreamsRebalanceData.TaskId> standbyTasks,
+ final
Set<StreamsRebalanceData.TaskId> warmupTasks,
+ final boolean
isGroupReady) {
return new StreamsRebalanceData.Assignment(
activeTasks,
standbyTasks,
- warmupTasks
+ warmupTasks,
+ isGroupReady
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
index f2376640c01..237dd4f0993 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
@@ -92,7 +92,8 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData.Assignment assignment = new
StreamsRebalanceData.Assignment(
Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)),
Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)),
- Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3))
+ Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)),
+ true
);
assertThrows(
@@ -111,11 +112,11 @@ public class StreamsRebalanceDataTest {
@Test
public void assignmentShouldNotAcceptNulls() {
- final Exception exception1 = assertThrows(NullPointerException.class,
() -> new StreamsRebalanceData.Assignment(null, Set.of(), Set.of()));
+ final Exception exception1 = assertThrows(NullPointerException.class,
() -> new StreamsRebalanceData.Assignment(null, Set.of(), Set.of(), true));
assertEquals("Active tasks cannot be null", exception1.getMessage());
- final Exception exception2 = assertThrows(NullPointerException.class,
() -> new StreamsRebalanceData.Assignment(Set.of(), null, Set.of()));
+ final Exception exception2 = assertThrows(NullPointerException.class,
() -> new StreamsRebalanceData.Assignment(Set.of(), null, Set.of(), true));
assertEquals("Standby tasks cannot be null", exception2.getMessage());
- final Exception exception3 = assertThrows(NullPointerException.class,
() -> new StreamsRebalanceData.Assignment(Set.of(), Set.of(), null));
+ final Exception exception3 = assertThrows(NullPointerException.class,
() -> new StreamsRebalanceData.Assignment(Set.of(), Set.of(), null, true));
assertEquals("Warmup tasks cannot be null", exception3.getMessage());
}
@@ -125,43 +126,56 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData.Assignment assignment = new
StreamsRebalanceData.Assignment(
Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)),
Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)),
- Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3))
+ Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)),
+ true
);
final StreamsRebalanceData.Assignment assignmentEqual = new
StreamsRebalanceData.Assignment(
assignment.activeTasks(),
assignment.standbyTasks(),
- assignment.warmupTasks()
+ assignment.warmupTasks(),
+ assignment.isGroupReady()
);
Set<StreamsRebalanceData.TaskId> unequalActiveTasks = new
HashSet<>(assignment.activeTasks());
unequalActiveTasks.add(additionalTask);
final StreamsRebalanceData.Assignment assignmentUnequalActiveTasks =
new StreamsRebalanceData.Assignment(
unequalActiveTasks,
assignment.standbyTasks(),
- assignment.warmupTasks()
+ assignment.warmupTasks(),
+ assignment.isGroupReady()
);
Set<StreamsRebalanceData.TaskId> unequalStandbyTasks = new
HashSet<>(assignment.standbyTasks());
unequalStandbyTasks.add(additionalTask);
final StreamsRebalanceData.Assignment assignmentUnequalStandbyTasks =
new StreamsRebalanceData.Assignment(
assignment.activeTasks(),
unequalStandbyTasks,
- assignment.warmupTasks()
+ assignment.warmupTasks(),
+ assignment.isGroupReady()
);
Set<StreamsRebalanceData.TaskId> unequalWarmupTasks = new
HashSet<>(assignment.warmupTasks());
unequalWarmupTasks.add(additionalTask);
final StreamsRebalanceData.Assignment assignmentUnequalWarmupTasks =
new StreamsRebalanceData.Assignment(
assignment.activeTasks(),
assignment.standbyTasks(),
- unequalWarmupTasks
+ unequalWarmupTasks,
+ assignment.isGroupReady()
+ );
+ final StreamsRebalanceData.Assignment assignmentUnequalIsGroupReady =
new StreamsRebalanceData.Assignment(
+ assignment.activeTasks(),
+ assignment.standbyTasks(),
+ assignment.warmupTasks(),
+ !assignment.isGroupReady()
);
assertEquals(assignment, assignmentEqual);
assertNotEquals(assignment, assignmentUnequalActiveTasks);
assertNotEquals(assignment, assignmentUnequalStandbyTasks);
assertNotEquals(assignment, assignmentUnequalWarmupTasks);
+ assertNotEquals(assignment, assignmentUnequalIsGroupReady);
assertEquals(assignment.hashCode(), assignmentEqual.hashCode());
assertNotEquals(assignment.hashCode(),
assignmentUnequalActiveTasks.hashCode());
assertNotEquals(assignment.hashCode(),
assignmentUnequalStandbyTasks.hashCode());
assertNotEquals(assignment.hashCode(),
assignmentUnequalWarmupTasks.hashCode());
+ assertNotEquals(assignment.hashCode(),
assignmentUnequalIsGroupReady.hashCode());
}
@Test
@@ -169,7 +183,8 @@ public class StreamsRebalanceDataTest {
final StreamsRebalanceData.Assignment assignment = new
StreamsRebalanceData.Assignment(
Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 1)),
Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 2)),
- Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3))
+ Set.of(new StreamsRebalanceData.TaskId("subtopologyId1", 3)),
+ true
);
final StreamsRebalanceData.Assignment copy = assignment.copy();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
index 749a4594ab8..b7c7275df58 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceListenerInvokerTest.java
@@ -60,7 +60,7 @@ public class StreamsRebalanceListenerInvokerTest {
@Test
public void testSetRebalanceListenerWithNull() {
- NullPointerException exception =
assertThrows(NullPointerException.class,
+ NullPointerException exception =
assertThrows(NullPointerException.class,
() -> invoker.setRebalanceListener(null));
assertEquals("StreamsRebalanceListener cannot be null",
exception.getMessage());
}
@@ -96,12 +96,12 @@ public class StreamsRebalanceListenerInvokerTest {
@Test
public void testInvokeAllTasksRevokedWithListener() {
invoker.setRebalanceListener(mockListener);
-
+
StreamsRebalanceData.Assignment mockAssignment =
createMockAssignment();
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
Exception result = invoker.invokeAllTasksRevoked();
-
+
assertNull(result);
verify(mockListener).onTasksRevoked(eq(mockAssignment.activeTasks()));
}
@@ -112,7 +112,7 @@ public class StreamsRebalanceListenerInvokerTest {
StreamsRebalanceData.Assignment assignment = createMockAssignment();
Exception result = invoker.invokeTasksAssigned(assignment);
-
+
assertNull(result);
verify(mockListener).onTasksAssigned(eq(assignment));
}
@@ -123,10 +123,10 @@ public class StreamsRebalanceListenerInvokerTest {
StreamsRebalanceData.Assignment assignment = createMockAssignment();
WakeupException wakeupException = new WakeupException();
doThrow(wakeupException).when(mockListener).onTasksAssigned(assignment);
-
- WakeupException thrownException = assertThrows(WakeupException.class,
+
+ WakeupException thrownException = assertThrows(WakeupException.class,
() -> invoker.invokeTasksAssigned(assignment));
-
+
assertEquals(wakeupException, thrownException);
verify(mockListener).onTasksAssigned(eq(assignment));
}
@@ -137,10 +137,10 @@ public class StreamsRebalanceListenerInvokerTest {
StreamsRebalanceData.Assignment assignment = createMockAssignment();
InterruptException interruptException = new InterruptException("Test
interrupt");
doThrow(interruptException).when(mockListener).onTasksAssigned(assignment);
-
- InterruptException thrownException =
assertThrows(InterruptException.class,
+
+ InterruptException thrownException =
assertThrows(InterruptException.class,
() -> invoker.invokeTasksAssigned(assignment));
-
+
assertEquals(interruptException, thrownException);
verify(mockListener).onTasksAssigned(eq(assignment));
}
@@ -151,9 +151,9 @@ public class StreamsRebalanceListenerInvokerTest {
StreamsRebalanceData.Assignment assignment = createMockAssignment();
RuntimeException runtimeException = new RuntimeException("Test
exception");
doThrow(runtimeException).when(mockListener).onTasksAssigned(assignment);
-
+
Exception result = invoker.invokeTasksAssigned(assignment);
-
+
assertEquals(runtimeException, result);
verify(mockListener).onTasksAssigned(eq(assignment));
}
@@ -164,7 +164,7 @@ public class StreamsRebalanceListenerInvokerTest {
Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
Exception result = invoker.invokeTasksRevoked(tasks);
-
+
assertNull(result);
verify(mockListener).onTasksRevoked(eq(tasks));
}
@@ -175,10 +175,10 @@ public class StreamsRebalanceListenerInvokerTest {
Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
WakeupException wakeupException = new WakeupException();
doThrow(wakeupException).when(mockListener).onTasksRevoked(tasks);
-
- WakeupException thrownException = assertThrows(WakeupException.class,
+
+ WakeupException thrownException = assertThrows(WakeupException.class,
() -> invoker.invokeTasksRevoked(tasks));
-
+
assertEquals(wakeupException, thrownException);
verify(mockListener).onTasksRevoked(eq(tasks));
}
@@ -189,10 +189,10 @@ public class StreamsRebalanceListenerInvokerTest {
Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
InterruptException interruptException = new InterruptException("Test
interrupt");
doThrow(interruptException).when(mockListener).onTasksRevoked(tasks);
-
- InterruptException thrownException =
assertThrows(InterruptException.class,
+
+ InterruptException thrownException =
assertThrows(InterruptException.class,
() -> invoker.invokeTasksRevoked(tasks));
-
+
assertEquals(interruptException, thrownException);
verify(mockListener).onTasksRevoked(eq(tasks));
}
@@ -203,9 +203,9 @@ public class StreamsRebalanceListenerInvokerTest {
Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
RuntimeException runtimeException = new RuntimeException("Test
exception");
doThrow(runtimeException).when(mockListener).onTasksRevoked(tasks);
-
+
Exception result = invoker.invokeTasksRevoked(tasks);
-
+
assertEquals(runtimeException, result);
verify(mockListener).onTasksRevoked(eq(tasks));
}
@@ -215,7 +215,7 @@ public class StreamsRebalanceListenerInvokerTest {
invoker.setRebalanceListener(mockListener);
Exception result = invoker.invokeAllTasksLost();
-
+
assertNull(result);
verify(mockListener).onAllTasksLost();
}
@@ -225,10 +225,10 @@ public class StreamsRebalanceListenerInvokerTest {
invoker.setRebalanceListener(mockListener);
WakeupException wakeupException = new WakeupException();
doThrow(wakeupException).when(mockListener).onAllTasksLost();
-
- WakeupException thrownException = assertThrows(WakeupException.class,
+
+ WakeupException thrownException = assertThrows(WakeupException.class,
() -> invoker.invokeAllTasksLost());
-
+
assertEquals(wakeupException, thrownException);
verify(mockListener).onAllTasksLost();
}
@@ -238,10 +238,10 @@ public class StreamsRebalanceListenerInvokerTest {
invoker.setRebalanceListener(mockListener);
InterruptException interruptException = new InterruptException("Test
interrupt");
doThrow(interruptException).when(mockListener).onAllTasksLost();
-
- InterruptException thrownException =
assertThrows(InterruptException.class,
+
+ InterruptException thrownException =
assertThrows(InterruptException.class,
() -> invoker.invokeAllTasksLost());
-
+
assertEquals(interruptException, thrownException);
verify(mockListener).onAllTasksLost();
}
@@ -251,9 +251,9 @@ public class StreamsRebalanceListenerInvokerTest {
invoker.setRebalanceListener(mockListener);
RuntimeException runtimeException = new RuntimeException("Test
exception");
doThrow(runtimeException).when(mockListener).onAllTasksLost();
-
+
Exception result = invoker.invokeAllTasksLost();
-
+
assertEquals(runtimeException, result);
verify(mockListener).onAllTasksLost();
}
@@ -262,8 +262,8 @@ public class StreamsRebalanceListenerInvokerTest {
Set<StreamsRebalanceData.TaskId> activeTasks = createMockTasks();
Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of();
Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of();
-
- return new StreamsRebalanceData.Assignment(activeTasks, standbyTasks,
warmupTasks);
+
+ return new StreamsRebalanceData.Assignment(activeTasks, standbyTasks,
warmupTasks, true);
}
private Set<StreamsRebalanceData.TaskId> createMockTasks() {
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3d7a54ec2e2..5ce46083341 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -3847,6 +3847,9 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val response = sendRequestAndVerifyResponseError(request, resource,
isAuthorized = true).asInstanceOf[StreamsGroupHeartbeatResponse]
assertEquals(
util.List.of(new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("Assignment delayed due to the configured initial
rebalance delay."),
+ new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Internal topics are missing: [topic]; Unauthorized
to CREATE on topics topic.")),
response.data().status())
@@ -3878,6 +3881,9 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
// Request successful, and no internal topic creation error.
assertEquals(
util.List.of(new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("Assignment delayed due to the configured initial
rebalance delay."),
+ new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Internal topics are missing: [topic]")),
response.data().status())
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index b0e62b49899..77b47dc00d5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2068,6 +2068,12 @@ public class GroupMetadataManager {
// During initial rebalance delay, return empty assignment to
first joining members.
targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
targetAssignment = TasksTuple.EMPTY;
+
+ returnedStatus.add(
+ new Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("Assignment delayed due to the
configured initial rebalance delay.")
+ );
} else {
targetAssignment = updateStreamsTargetAssignment(
group,
@@ -2155,6 +2161,7 @@ public class GroupMetadataManager {
));
response.setStatus(returnedStatus);
+
return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 213f16affa4..769b282403c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -17581,7 +17581,11 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setPartitionsByUserEndpoint(null)
.setEndpointInformationEpoch(0)
- .setStatus(List.of()),
+ .setStatus(List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+ .setStatusCode(Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("Assignment delayed due to the
configured initial rebalance delay.")
+ )),
result.response().data()
);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index 121b6d5217a..df993ac81ad 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -247,6 +247,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
return "org.apache.kafka." + group + "." + name;
}).filter(name ->
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter
filters out string metrics
.sorted().toList();
+ TestUtils.waitForCondition(
+ () ->
TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(mainConsumerInstanceId).size()
== expectedMetrics.size(),
+ 30_000,
+ "Never received enough metrics"
+ );
final List<String> actualMetrics = new
ArrayList<>(TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index a56723abc33..b31cee581b0 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
@@ -48,15 +49,17 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Locale;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -339,8 +342,13 @@ public class MetricsIntegrationTest {
() -> "Kafka Streams application did not reach state NOT_RUNNING
in " + timeout + " ms");
}
- @Test
- public void shouldAddMetricsOnAllLevels() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void shouldAddMetricsOnAllLevels(final boolean
streamsProtocolEnabled) throws Exception {
+ if (streamsProtocolEnabled) {
+ streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ }
+
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(),
Serdes.String()));
builder.table(STREAM_OUTPUT_1,
@@ -373,8 +381,13 @@ public class MetricsIntegrationTest {
checkMetricsDeregistration();
}
- @Test
- public void shouldAddMetricsForWindowStoreAndSuppressionBuffer() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void shouldAddMetricsForWindowStoreAndSuppressionBuffer(final
boolean streamsProtocolEnabled) throws Exception {
+ if (streamsProtocolEnabled) {
+ streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ }
+
final Duration windowSize = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
.groupByKey()
@@ -401,8 +414,13 @@ public class MetricsIntegrationTest {
checkMetricsDeregistration();
}
- @Test
- public void shouldAddMetricsForSessionStore() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void shouldAddMetricsForSessionStore(final boolean
streamsProtocolEnabled) throws Exception {
+ if (streamsProtocolEnabled) {
+ streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ }
+
final Duration inactivityGap = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
.groupByKey()
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 54aa5dd5f0d..d77355831bc 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -42,15 +43,17 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -142,9 +145,13 @@ public class RocksDBMetricsIntegrationTest {
void verify(final KafkaStreams kafkaStreams, final String metricScope)
throws Exception;
}
- @Test
- public void
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final TestInfo
testInfo) throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir(final boolean
streamsProtocolEnabled, final TestInfo testInfo) throws Exception {
final Properties streamsConfiguration = streamsConfig(testInfo);
+ if (streamsProtocolEnabled) {
+ streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ }
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final StreamsBuilder builder = builderForStateStores();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
index 41accecb1b1..1cdfcba6810 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java
@@ -96,6 +96,7 @@ public class DefaultStreamsRebalanceListener implements
StreamsRebalanceListener
log.info("Processing new assignment {} from Streams Rebalance
Protocol", assignment);
try {
+ streamThread.setStreamsGroupReady(assignment.isGroupReady());
taskManager.handleAssignment(activeTasksWithPartitions,
standbyTasksWithPartitions);
streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
taskManager.handleRebalanceComplete();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 84c4f51734a..405ea720b6e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -339,6 +339,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
private long lastPurgeMs;
private long lastPartitionAssignedMs = -1L;
private int numIterations;
+ private boolean streamsGroupReady = false;
private volatile State state = State.CREATED;
private volatile ThreadMetadata threadMetadata;
private StreamThread.StateListener stateListener;
@@ -1163,7 +1164,15 @@ public class StreamThread extends Thread implements
ProcessingThread {
pollLatency = pollPhase();
totalPolledSinceLastSummary += 1;
- handleStreamsRebalanceData();
+ if (streamsRebalanceData.isPresent()) {
+ // Always handle status codes (e.g., MISSING_SOURCE_TOPICS,
INCORRECTLY_PARTITIONED_TOPICS)
+ // regardless of streamsGroupReady, as these may throw exceptions
that need to be handled.
+ handleStreamsRebalanceData();
+
+ if (!streamsGroupReady) {
+ return;
+ }
+ }
// Shutdown hook could potentially be triggered and transit the thread
state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the
state transition happens during #onPartitionsAssigned().
@@ -1307,7 +1316,15 @@ public class StreamThread extends Thread implements
ProcessingThread {
taskManager.resumePollingForPartitionsWithAvailableSpace();
pollLatency = pollPhase();
- handleStreamsRebalanceData();
+ if (streamsRebalanceData.isPresent()) {
+ // Always handle status codes (e.g., MISSING_SOURCE_TOPICS,
INCORRECTLY_PARTITIONED_TOPICS)
+ // regardless of streamsGroupReady, as these may throw exceptions
that need to be handled.
+ handleStreamsRebalanceData();
+
+ if (!streamsGroupReady) {
+ return;
+ }
+ }
// Shutdown hook could potentially be triggered and transit the thread
state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the
state transition happens during #onPartitionsAssigned().
@@ -1487,6 +1504,17 @@ public class StreamThread extends Thread implements
ProcessingThread {
return records;
}
+ /**
+ * Sets the readiness state of the Streams group for this thread.
+ *
+ * @param ready {@code true} if the Streams group is ready to process
records; {@code false} otherwise.
+ * When set to {@code true}, this thread may transition to an
active processing state.
+ * When set to {@code false}, the thread will not process
records until the group is ready.
+ */
+ public void setStreamsGroupReady(final boolean ready) {
+ streamsGroupReady = ready;
+ }
+
public void handleStreamsRebalanceData() {
if (streamsRebalanceData.isPresent()) {
boolean hasMissingSourceTopics = false;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
index 0e9e013f628..81103b3a2ea 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java
@@ -173,12 +173,14 @@ public class DefaultStreamsRebalanceListenerTest {
final StreamsRebalanceData.Assignment assignment = new
StreamsRebalanceData.Assignment(
Set.of(new StreamsRebalanceData.TaskId("1", 0)),
Set.of(new StreamsRebalanceData.TaskId("2", 0)),
- Set.of(new StreamsRebalanceData.TaskId("3", 0))
+ Set.of(new StreamsRebalanceData.TaskId("3", 0)),
+ false
);
assertDoesNotThrow(() ->
defaultStreamsRebalanceListener.onTasksAssigned(assignment));
final InOrder inOrder = inOrder(taskManager, streamThread,
streamsRebalanceData);
+ inOrder.verify(streamThread).setStreamsGroupReady(false);
inOrder.verify(taskManager).handleAssignment(
Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0),
new TopicPartition("repartition1", 0))),
Map.of(
@@ -201,10 +203,11 @@ public class DefaultStreamsRebalanceListenerTest {
createRebalanceListenerWithRebalanceData(streamsRebalanceData);
final Exception actualException = assertThrows(RuntimeException.class,
() -> defaultStreamsRebalanceListener.onTasksAssigned(
- new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
+ new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(),
false)
));
assertEquals(exception, actualException);
+ verify(streamThread).setStreamsGroupReady(false);
verify(taskManager).handleAssignment(any(), any());
verify(streamThread,
never()).setState(StreamThread.State.PARTITIONS_ASSIGNED);
verify(taskManager, never()).handleRebalanceComplete();
@@ -302,11 +305,13 @@ public class DefaultStreamsRebalanceListenerTest {
new StreamsRebalanceData.Assignment(
Set.of(new StreamsRebalanceData.TaskId("1", 0)),
Set.of(),
- Set.of()
+ Set.of(),
+ true
)
);
verify(tasksAssignedSensor).record(150L);
+ verify(streamThread).setStreamsGroupReady(true);
verify(taskManager).handleAssignment(
Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0),
new TopicPartition("repartition1", 0))),
Map.of()
@@ -376,10 +381,11 @@ public class DefaultStreamsRebalanceListenerTest {
createRebalanceListenerWithRebalanceData(new
StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
assertThrows(RuntimeException.class, () ->
defaultStreamsRebalanceListener.onTasksAssigned(
- new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
+ new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(),
false)
));
verify(tasksAssignedSensor).record(75L);
+ verify(streamThread).setStreamsGroupReady(false);
verify(taskManager).handleAssignment(any(), any());
}