This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 678d456ad7d KAFKA-19044: Handle tasks that are not present in the
current topology (#19722)
678d456ad7d is described below
commit 678d456ad7d8bf0d2a3ab72e5e419a70e3c6b91a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Jun 4 20:22:52 2025 +0200
KAFKA-19044: Handle tasks that are not present in the current topology
(#19722)
A heartbeat might be sent to the group coordinator, claiming to own
tasks that we do not know about. We need some logic to handle those
requests. In KIP-1071, we propose to return `INVALID_REQUEST` error
whenever this happens, effectively letting the clients crash.
This behavior will, however, make topology updates impossible. Bruno
Cadonna proposed to only check that owned tasks match our set of
expected tasks if the topology epochs between the group and the client
match. The aim of this change is to implement a check and a behavior
for the first version of the protocol, which is to always return
`INVALID_REQUEST` if an unknown task is sent to the group coordinator.
We can relax this constraint once we allow topology updating with
topology epochs.
To efficiently check this whenever we receive a heartbeat containing
tasks, we precompute the number of tasks for each subtopology. This also
benefits the performance of the assignor.
Reviewers: Bill Bejeck <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 37 +++++++++++++
.../group/streams/TopologyMetadata.java | 23 +-------
.../group/streams/topics/ChangelogTopics.java | 6 ++-
.../streams/topics/ConfiguredSubtopology.java | 12 ++++-
.../group/streams/topics/InternalTopicManager.java | 19 ++++++-
.../group/GroupMetadataManagerTest.java | 62 ++++++++++++++++++++++
.../group/streams/TargetAssignmentBuilderTest.java | 2 +-
.../group/streams/TopologyMetadataTest.java | 17 +-----
.../streams/topics/ConfiguredSubtopologyTest.java | 19 ++++++-
.../topics/EndpointToPartitionsManagerTest.java | 8 +--
.../streams/topics/InternalTopicManagerTest.java | 2 +
11 files changed, 160 insertions(+), 47 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a3115c930a5..4f1f0775efd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -156,6 +156,7 @@ import
org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
@@ -195,6 +196,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
@@ -1665,6 +1667,34 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Validates that the requested tasks exist in the configured topology and
partitions are valid.
+ * If tasks is null, does nothing. If an invalid task is found, throws
InvalidRequestException.
+ *
+ * @param subtopologySortedMap The configured topology.
+ * @param tasks The list of requested tasks.
+ */
+ private static void throwIfRequestContainsInvalidTasks(
+ SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+ List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+ ) {
+ if (tasks == null || tasks.isEmpty()) return;
+ for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
+ String subtopologyId = task.subtopologyId();
+ ConfiguredSubtopology subtopology =
subtopologySortedMap.get(subtopologyId);
+ if (subtopology == null) {
+ throw new InvalidRequestException("Subtopology " +
subtopologyId + " does not exist in the topology.");
+ }
+ int numTasks = subtopology.numberOfTasks();
+ for (Integer partition : task.partitions()) {
+ if (partition < 0 || partition >= numTasks) {
+ throw new InvalidRequestException("Task " + partition + "
for subtopology " + subtopologyId +
+ " is invalid. Number of tasks for this subtopology: "
+ numTasks);
+ }
+ }
+ }
+ }
+
/**
* Validates if the received classic member protocols are supported by the
group.
*
@@ -1917,6 +1947,13 @@ public class GroupMetadataManager {
updatedConfiguredTopology = group.configuredTopology().get();
}
+ if (updatedConfiguredTopology.isReady()) {
+ SortedMap<String, ConfiguredSubtopology> subtopologySortedMap =
updatedConfiguredTopology.subtopologies().get();
+ throwIfRequestContainsInvalidTasks(subtopologySortedMap,
ownedActiveTasks);
+ throwIfRequestContainsInvalidTasks(subtopologySortedMap,
ownedStandbyTasks);
+ throwIfRequestContainsInvalidTasks(subtopologySortedMap,
ownedWarmupTasks);
+ }
+
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
index 0241083233b..b91bfc98706 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -19,14 +19,12 @@ package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.image.MetadataImage;
-import org.apache.kafka.image.TopicImage;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.SortedMap;
-import java.util.stream.Stream;
/**
* The topology metadata class is used by the {@link
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic
and
@@ -42,14 +40,6 @@ public record TopologyMetadata(MetadataImage metadataImage,
SortedMap<String, Co
subtopologyMap =
Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
}
- /**
- * @return The metadata image in topology metadata.
- */
- @Override
- public MetadataImage metadataImage() {
- return this.metadataImage;
- }
-
/**
* Checks whether the given subtopology is associated with a changelog
topic.
*
@@ -85,18 +75,7 @@ public record TopologyMetadata(MetadataImage metadataImage,
SortedMap<String, Co
@Override
public int maxNumInputPartitions(String subtopologyId) {
final ConfiguredSubtopology subtopology =
getSubtopologyOrFail(subtopologyId);
- return Stream.concat(
- subtopology.sourceTopics().stream(),
- subtopology.repartitionSourceTopics().keySet().stream()
- ).map(topic -> {
- TopicImage topicImage = metadataImage.topics().getTopic(topic);
- if (topicImage == null) {
- throw new IllegalStateException("Topic " + topic + " not found
in metadata image");
- }
- return topicImage.partitions().size();
- }).max(Integer::compareTo).orElseThrow(
- () -> new IllegalStateException("Subtopology does not contain any
source topics")
- );
+ return subtopology.numberOfTasks();
}
private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
index 8db69e2f2a3..b69858cd0ed 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
@@ -85,8 +85,10 @@ public class ChangelogTopics {
}
}
- log.debug("Expecting state changelog topic partitions {} for the
requested topology.",
- changelogTopicPartitions.entrySet().stream().map(e -> e.getKey() +
":" + e.getValue()).collect(Collectors.joining(", ")));
+ if (!changelogTopicPartitions.isEmpty()) {
+ log.debug("Expecting state changelog topic partitions {} for the
requested topology.",
+ changelogTopicPartitions.entrySet().stream().map(e ->
e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")));
+ }
return changelogTopicPartitions;
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
index 8ef41c4967e..41d977dc9f8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
@@ -31,17 +31,25 @@ import java.util.Set;
* <p>
* Configured subtopologies may be recreated every time the input topics used
by the subtopology are modified.
*
+ * @param numberOfTasks Precomputed number of tasks. Not that not
every source topic may have a partition for
+ * every task, in cases where there are
multiple source topics with an unequal number of
+ * partitions (e.g., one topic has 3 partitions
and another has 5 and both are used in a
+ * merge).
* @param sourceTopics The source topics of the subtopology.
* @param repartitionSourceTopics The repartition source topics of the
subtopology.
* @param repartitionSinkTopics The repartition sink topics of the
subtopology.
* @param stateChangelogTopics The state changelog topics of the
subtopology.
*/
-public record ConfiguredSubtopology(Set<String> sourceTopics,
+public record ConfiguredSubtopology(int numberOfTasks,
+ Set<String> sourceTopics,
Map<String, ConfiguredInternalTopic>
repartitionSourceTopics,
Set<String> repartitionSinkTopics,
Map<String, ConfiguredInternalTopic>
stateChangelogTopics) {
public ConfiguredSubtopology {
+ if (numberOfTasks <= 0) {
+ throw new IllegalArgumentException("Number of tasks must be
positive");
+ }
Objects.requireNonNull(sourceTopics, "sourceTopics can't be null");
Objects.requireNonNull(repartitionSourceTopics,
"repartitionSourceTopics can't be null");
Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics
can't be null");
@@ -61,4 +69,6 @@ public record ConfiguredSubtopology(Set<String> sourceTopics,
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
}
+
+
}
\ No newline at end of file
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 490289c2c85..c1845ca2e24 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -82,7 +82,7 @@ public class InternalTopicManager {
subtopologies.stream()
.collect(Collectors.toMap(
StreamsGroupTopologyValue.Subtopology::subtopologyId,
- x -> fromPersistedSubtopology(x,
decidedPartitionCountsForInternalTopics),
+ x -> fromPersistedSubtopology(x, topicsImage,
decidedPartitionCountsForInternalTopics),
(v1, v2) -> {
throw new
RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
},
@@ -264,9 +264,11 @@ public class InternalTopicManager {
}
private static ConfiguredSubtopology fromPersistedSubtopology(final
StreamsGroupTopologyValue.Subtopology subtopology,
+ final
TopicsImage topicsImage,
final
Map<String, Integer> decidedPartitionCountsForInternalTopics
) {
return new ConfiguredSubtopology(
+ computeNumberOfTasks(subtopology, topicsImage,
decidedPartitionCountsForInternalTopics),
new HashSet<>(subtopology.sourceTopics()),
subtopology.repartitionSourceTopics().stream()
.map(x -> fromPersistedTopicInfo(x,
decidedPartitionCountsForInternalTopics))
@@ -278,6 +280,21 @@ public class InternalTopicManager {
);
}
+ private static int computeNumberOfTasks(final
StreamsGroupTopologyValue.Subtopology subtopology,
+ final TopicsImage topicsImage,
+ final Map<String, Integer>
decidedPartitionCountsForInternalTopics) {
+ return Stream.concat(
+ subtopology.sourceTopics().stream(),
+
subtopology.repartitionSourceTopics().stream().map(StreamsGroupTopologyValue.TopicInfo::name)
+ ).map(
+ topic -> getPartitionCount(topicsImage, topic,
decidedPartitionCountsForInternalTopics).orElseThrow(
+ () -> new IllegalStateException("Number of partitions must be
set for topic " + topic)
+ )
+ ).max(Integer::compareTo).orElseThrow(
+ () -> new IllegalStateException("Subtopology does not contain any
source topics")
+ );
+ }
+
private static ConfiguredInternalTopic fromPersistedTopicInfo(final
StreamsGroupTopologyValue.TopicInfo topicInfo,
final
Map<String, Integer> decidedPartitionCountsForInternalTopics) {
if (topicInfo.partitions() == 0 &&
!decidedPartitionCountsForInternalTopics.containsKey(topicInfo.name())) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index e83cb00af45..dcc2a4ca5f3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -16014,6 +16015,67 @@ public class GroupMetadataManagerTest {
assertEquals(100, result.response().data().memberEpoch());
}
+ @Test
+ public void testStreamsOwnedTasksValidation() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String subtopologyMissing = "subtopologyMissing";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .build())
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+ .build())
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+ .withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+ .withTargetAssignmentEpoch(10)
+ )
+ .build();
+
+ InvalidRequestException e1 =
assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(subtopologyMissing)
+ .setPartitions(List.of(0))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())));
+ assertEquals(e1.getMessage(), "Subtopology subtopologyMissing does not
exist in the topology.");
+
+ InvalidRequestException e2 =
assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(3))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())));
+ assertEquals(e2.getMessage(), "Task 3 for subtopology subtopology1 is
invalid. Number of tasks for this subtopology: 3");
+ }
+
@Test
public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
String groupId = "fooup";
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index b55b05d30d9..ad69879af32 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -713,7 +713,7 @@ public class TargetAssignmentBuilderTest {
String subtopologyId = Uuid.randomUuid().toString();
Uuid topicId = Uuid.randomUuid();
topicsImageBuilder = topicsImageBuilder.addTopic(topicId,
topicName, numTasks);
- subtopologies.put(subtopologyId, new
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(),
Map.of()));
+ subtopologies.put(subtopologyId, new
ConfiguredSubtopology(numTasks, Set.of(topicId.toString()), Map.of(), Set.of(),
Map.of()));
return subtopologyId;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
index a39914db1bc..fa047be2a8a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -82,11 +81,9 @@ class TopologyMetadataTest {
@Test
void testMaxNumInputPartitions() {
- ConfiguredInternalTopic internalTopic =
mock(ConfiguredInternalTopic.class);
ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
subtopologyMap.put("subtopology1", subtopology);
- when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
-
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
internalTopic));
+ when(subtopology.numberOfTasks()).thenReturn(4);
assertEquals(4,
topologyMetadata.maxNumInputPartitions("subtopology1"));
}
@@ -111,14 +108,4 @@ class TopologyMetadataTest {
void
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
assertThrows(NoSuchElementException.class, () ->
topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
}
-
- @Test
- void
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyContainsNoSourceTopics()
{
- ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
- when(subtopology.sourceTopics()).thenReturn(Set.of());
- when(subtopology.repartitionSourceTopics()).thenReturn(Map.of());
- subtopologyMap.put("subtopology1", subtopology);
-
- assertThrows(IllegalStateException.class, () ->
topologyMetadata.maxNumInputPartitions("subtopology1"));
- }
-}
\ No newline at end of file
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
index a7082248857..11280b9426b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
@@ -36,6 +36,7 @@ public class ConfiguredSubtopologyTest {
public void testConstructorWithNullSourceTopics() {
assertThrows(NullPointerException.class,
() -> new ConfiguredSubtopology(
+ 2,
null,
Map.of(),
Set.of(),
@@ -48,6 +49,7 @@ public class ConfiguredSubtopologyTest {
public void testConstructorWithNullRepartitionSourceTopics() {
assertThrows(NullPointerException.class,
() -> new ConfiguredSubtopology(
+ 2,
Set.of(),
null,
Set.of(),
@@ -60,6 +62,7 @@ public class ConfiguredSubtopologyTest {
public void testConstructorWithNullRepartitionSinkTopics() {
assertThrows(NullPointerException.class,
() -> new ConfiguredSubtopology(
+ 2,
Set.of(),
Map.of(),
null,
@@ -72,6 +75,7 @@ public class ConfiguredSubtopologyTest {
public void testConstructorWithNullStateChangelogTopics() {
assertThrows(NullPointerException.class,
() -> new ConfiguredSubtopology(
+ 2,
Set.of(),
Map.of(),
Set.of(),
@@ -80,6 +84,19 @@ public class ConfiguredSubtopologyTest {
);
}
+ @Test
+ public void testConstructorWithNegativeTaskCount() {
+ assertThrows(IllegalArgumentException.class,
+ () -> new ConfiguredSubtopology(
+ -1,
+ Set.of(),
+ Map.of(),
+ Set.of(),
+ Map.of()
+ )
+ );
+ }
+
@Test
public void testAsStreamsGroupDescribeSubtopology() {
String subtopologyId = "subtopology1";
@@ -91,7 +108,7 @@ public class ConfiguredSubtopologyTest {
Map<String, ConfiguredInternalTopic> repartitionSourceTopics =
Map.of("repartitionSourceTopic1", internalTopicMock);
Map<String, ConfiguredInternalTopic> stateChangelogTopics =
Map.of("stateChangelogTopic1", internalTopicMock);
ConfiguredSubtopology configuredSubtopology = new
ConfiguredSubtopology(
- sourceTopics, repartitionSourceTopics, repartitionSinkTopics,
stateChangelogTopics);
+ 1, sourceTopics, repartitionSourceTopics, repartitionSinkTopics,
stateChangelogTopics);
StreamsGroupDescribeResponseData.Subtopology subtopology =
configuredSubtopology.asStreamsGroupDescribeSubtopology(subtopologyId);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
index cba28e0163d..eea0d0051f3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
@@ -65,9 +65,9 @@ class EndpointToPartitionsManagerTest {
streamsGroup = mock(StreamsGroup.class);
streamsGroupMember = mock(StreamsGroupMember.class);
configuredTopology = mock(ConfiguredTopology.class);
- configuredSubtopologyOne = new
ConfiguredSubtopology(Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new
HashMap<>());
+ configuredSubtopologyOne = new ConfiguredSubtopology(1,
Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new HashMap<>());
Map<String, ConfiguredInternalTopic> repartitionSourceTopics =
Map.of("Topic-B", new ConfiguredInternalTopic("Topic-B", 1,
Optional.of((short) 1), Collections.emptyMap()));
- configuredSubtopologyTwo = new ConfiguredSubtopology(new HashSet<>(),
repartitionSourceTopics, new HashSet<>(), new HashMap<>());
+ configuredSubtopologyTwo = new ConfiguredSubtopology(1, new
HashSet<>(), repartitionSourceTopics, new HashSet<>(), new HashMap<>());
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap =
new TreeMap<>();
configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyTwoMap =
new TreeMap<>();
@@ -128,7 +128,7 @@ class EndpointToPartitionsManagerTest {
.addTopic(Uuid.randomUuid(), "Topic-A", topicAPartitions)
.addTopic(Uuid.randomUuid(), "Topic-B", topicBPartitions)
.build();
- configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A",
"Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
+ configuredSubtopologyOne = new
ConfiguredSubtopology(Math.max(topicAPartitions, topicBPartitions),
Set.of("Topic-A", "Topic-B"), new HashMap<>(), new HashSet<>(), new
HashMap<>());
activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
when(streamsGroupMember.assignedTasks()).thenReturn(new
TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
@@ -160,4 +160,4 @@ class EndpointToPartitionsManagerTest {
arguments(3, 3, List.of(0, 1, 2), List.of(0, 1, 2), "Should
assign correct partitions when partitions same between topics")
);
}
-}
\ No newline at end of file
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index f3b40dc282b..07402ac8ebf 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -107,6 +107,7 @@ class InternalTopicManagerTest {
return mkMap(
mkEntry(SUBTOPOLOGY_1,
new ConfiguredSubtopology(
+ 2,
Set.of(SOURCE_TOPIC_1),
Map.of(),
Set.of(REPARTITION_TOPIC),
@@ -121,6 +122,7 @@ class InternalTopicManagerTest {
),
mkEntry(SUBTOPOLOGY_2,
new ConfiguredSubtopology(
+ 2,
Set.of(SOURCE_TOPIC_2),
Map.of(REPARTITION_TOPIC,
new ConfiguredInternalTopic(REPARTITION_TOPIC,