This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 678d456ad7d KAFKA-19044: Handle tasks that are not present in the 
current topology (#19722)
678d456ad7d is described below

commit 678d456ad7d8bf0d2a3ab72e5e419a70e3c6b91a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Jun 4 20:22:52 2025 +0200

    KAFKA-19044: Handle tasks that are not present in the current topology 
(#19722)
    
    A heartbeat might be sent to the group coordinator, claiming to own
    tasks that we do  not know about. We need some logic to handle those
    requests. In KIP-1071, we propose  to return `INVALID_REQUEST` error
    whenever this happens, effectively letting the  clients crash.
    
    This behavior will, however, make topology updates impossible. Bruno
    Cadonna proposed  to only check that owned tasks match our set of
    expected tasks if the topology epochs  between the group and the client
    match. The aim of this change is to implement a  check and a behavior
    for the first version of the protocol, which is to always  return
    `INVALID_REQUEST` if an unknown task is sent to the group coordinator.
    
    We can relax this constraint once we allow topology updating with
    topology epochs.
    
    To efficiently check this whenever we receive a heartbeat containing
    tasks, we precompute the number of tasks for each subtopology. This also
    benefits the performance of the assignor.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 37 +++++++++++++
 .../group/streams/TopologyMetadata.java            | 23 +-------
 .../group/streams/topics/ChangelogTopics.java      |  6 ++-
 .../streams/topics/ConfiguredSubtopology.java      | 12 ++++-
 .../group/streams/topics/InternalTopicManager.java | 19 ++++++-
 .../group/GroupMetadataManagerTest.java            | 62 ++++++++++++++++++++++
 .../group/streams/TargetAssignmentBuilderTest.java |  2 +-
 .../group/streams/TopologyMetadataTest.java        | 17 +-----
 .../streams/topics/ConfiguredSubtopologyTest.java  | 19 ++++++-
 .../topics/EndpointToPartitionsManagerTest.java    |  8 +--
 .../streams/topics/InternalTopicManagerTest.java   |  2 +
 11 files changed, 160 insertions(+), 47 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a3115c930a5..4f1f0775efd 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -156,6 +156,7 @@ import 
org.apache.kafka.coordinator.group.streams.TasksTuple;
 import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
 import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
 import 
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
 import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
@@ -195,6 +196,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
@@ -1665,6 +1667,34 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Validates that the requested tasks exist in the configured topology and 
partitions are valid.
+     * If tasks is null, does nothing. If an invalid task is found, throws 
InvalidRequestException.
+     *
+     * @param subtopologySortedMap The configured topology.
+     * @param tasks                The list of requested tasks.
+     */
+    private static void throwIfRequestContainsInvalidTasks(
+        SortedMap<String, ConfiguredSubtopology> subtopologySortedMap,
+        List<StreamsGroupHeartbeatRequestData.TaskIds> tasks
+    ) {
+        if (tasks == null || tasks.isEmpty()) return;
+        for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
+            String subtopologyId = task.subtopologyId();
+            ConfiguredSubtopology subtopology = 
subtopologySortedMap.get(subtopologyId);
+            if (subtopology == null) {
+                throw new InvalidRequestException("Subtopology " + 
subtopologyId + " does not exist in the topology.");
+            }
+            int numTasks = subtopology.numberOfTasks();
+            for (Integer partition : task.partitions()) {
+                if (partition < 0 || partition >= numTasks) {
+                    throw new InvalidRequestException("Task " + partition + " 
for subtopology " + subtopologyId +
+                        " is invalid. Number of tasks for this subtopology: " 
+ numTasks);
+                }
+            }
+        }
+    }
+
     /**
      * Validates if the received classic member protocols are supported by the 
group.
      *
@@ -1917,6 +1947,13 @@ public class GroupMetadataManager {
             updatedConfiguredTopology = group.configuredTopology().get();
         }
 
+        if (updatedConfiguredTopology.isReady()) {
+            SortedMap<String, ConfiguredSubtopology> subtopologySortedMap = 
updatedConfiguredTopology.subtopologies().get();
+            throwIfRequestContainsInvalidTasks(subtopologySortedMap, 
ownedActiveTasks);
+            throwIfRequestContainsInvalidTasks(subtopologySortedMap, 
ownedStandbyTasks);
+            throwIfRequestContainsInvalidTasks(subtopologySortedMap, 
ownedWarmupTasks);
+        }
+
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
         if (bumpGroupEpoch) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
index 0241083233b..b91bfc98706 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -19,14 +19,12 @@ package org.apache.kafka.coordinator.group.streams;
 import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.image.MetadataImage;
-import org.apache.kafka.image.TopicImage;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.SortedMap;
-import java.util.stream.Stream;
 
 /**
  * The topology metadata class is used by the {@link 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic 
and
@@ -42,14 +40,6 @@ public record TopologyMetadata(MetadataImage metadataImage, 
SortedMap<String, Co
         subtopologyMap = 
Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
     }
 
-    /**
-     * @return The metadata image in topology metadata.
-     */
-    @Override
-    public MetadataImage metadataImage() {
-        return this.metadataImage;
-    }
-
     /**
      * Checks whether the given subtopology is associated with a changelog 
topic.
      *
@@ -85,18 +75,7 @@ public record TopologyMetadata(MetadataImage metadataImage, 
SortedMap<String, Co
     @Override
     public int maxNumInputPartitions(String subtopologyId) {
         final ConfiguredSubtopology subtopology = 
getSubtopologyOrFail(subtopologyId);
-        return Stream.concat(
-            subtopology.sourceTopics().stream(),
-            subtopology.repartitionSourceTopics().keySet().stream()
-        ).map(topic -> {
-            TopicImage topicImage = metadataImage.topics().getTopic(topic);
-            if (topicImage == null) {
-                throw new IllegalStateException("Topic " + topic + " not found 
in metadata image");
-            }
-            return topicImage.partitions().size();
-        }).max(Integer::compareTo).orElseThrow(
-            () -> new IllegalStateException("Subtopology does not contain any 
source topics")
-        );
+        return subtopology.numberOfTasks();
     }
 
     private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
index 8db69e2f2a3..b69858cd0ed 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java
@@ -85,8 +85,10 @@ public class ChangelogTopics {
             }
         }
 
-        log.debug("Expecting state changelog topic partitions {} for the 
requested topology.",
-            changelogTopicPartitions.entrySet().stream().map(e -> e.getKey() + 
":" + e.getValue()).collect(Collectors.joining(", ")));
+        if (!changelogTopicPartitions.isEmpty()) {
+            log.debug("Expecting state changelog topic partitions {} for the 
requested topology.",
+                changelogTopicPartitions.entrySet().stream().map(e -> 
e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")));
+        }
 
         return changelogTopicPartitions;
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
index 8ef41c4967e..41d977dc9f8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
@@ -31,17 +31,25 @@ import java.util.Set;
  * <p>
  * Configured subtopologies may be recreated every time the input topics used 
by the subtopology are modified.
  *
+ * @param numberOfTasks           Precomputed number of tasks. Not that not 
every source topic may have a partition for
+ *                                every task, in cases where there are 
multiple source topics with an unequal number of
+ *                                partitions (e.g., one topic has 3 partitions 
and another has 5 and both are used in a
+ *                                merge).
  * @param sourceTopics            The source topics of the subtopology.
  * @param repartitionSourceTopics The repartition source topics of the 
subtopology.
  * @param repartitionSinkTopics   The repartition sink topics of the 
subtopology.
  * @param stateChangelogTopics    The state changelog topics of the 
subtopology.
  */
-public record ConfiguredSubtopology(Set<String> sourceTopics,
+public record ConfiguredSubtopology(int numberOfTasks,
+                                    Set<String> sourceTopics,
                                     Map<String, ConfiguredInternalTopic> 
repartitionSourceTopics,
                                     Set<String> repartitionSinkTopics,
                                     Map<String, ConfiguredInternalTopic> 
stateChangelogTopics) {
 
     public ConfiguredSubtopology {
+        if (numberOfTasks <= 0) {
+            throw new IllegalArgumentException("Number of tasks must be 
positive");
+        }
         Objects.requireNonNull(sourceTopics, "sourceTopics can't be null");
         Objects.requireNonNull(repartitionSourceTopics, 
"repartitionSourceTopics can't be null");
         Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics 
can't be null");
@@ -61,4 +69,6 @@ public record ConfiguredSubtopology(Set<String> sourceTopics,
                 
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
     }
 
+
+
 }
\ No newline at end of file
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 490289c2c85..c1845ca2e24 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -82,7 +82,7 @@ public class InternalTopicManager {
                 subtopologies.stream()
                     .collect(Collectors.toMap(
                         StreamsGroupTopologyValue.Subtopology::subtopologyId,
-                        x -> fromPersistedSubtopology(x, 
decidedPartitionCountsForInternalTopics),
+                        x -> fromPersistedSubtopology(x, topicsImage, 
decidedPartitionCountsForInternalTopics),
                         (v1, v2) -> {
                             throw new 
RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
                         },
@@ -264,9 +264,11 @@ public class InternalTopicManager {
     }
 
     private static ConfiguredSubtopology fromPersistedSubtopology(final 
StreamsGroupTopologyValue.Subtopology subtopology,
+                                                                  final 
TopicsImage topicsImage,
                                                                   final 
Map<String, Integer> decidedPartitionCountsForInternalTopics
     ) {
         return new ConfiguredSubtopology(
+            computeNumberOfTasks(subtopology, topicsImage, 
decidedPartitionCountsForInternalTopics),
             new HashSet<>(subtopology.sourceTopics()),
             subtopology.repartitionSourceTopics().stream()
                 .map(x -> fromPersistedTopicInfo(x, 
decidedPartitionCountsForInternalTopics))
@@ -278,6 +280,21 @@ public class InternalTopicManager {
         );
     }
 
+    private static int computeNumberOfTasks(final 
StreamsGroupTopologyValue.Subtopology subtopology,
+                                            final TopicsImage topicsImage,
+                                            final Map<String, Integer> 
decidedPartitionCountsForInternalTopics) {
+        return Stream.concat(
+            subtopology.sourceTopics().stream(),
+            
subtopology.repartitionSourceTopics().stream().map(StreamsGroupTopologyValue.TopicInfo::name)
+        ).map(
+            topic -> getPartitionCount(topicsImage, topic, 
decidedPartitionCountsForInternalTopics).orElseThrow(
+                () -> new IllegalStateException("Number of partitions must be 
set for topic " + topic)
+            )
+        ).max(Integer::compareTo).orElseThrow(
+            () -> new IllegalStateException("Subtopology does not contain any 
source topics")
+        );
+    }
+
     private static ConfiguredInternalTopic fromPersistedTopicInfo(final 
StreamsGroupTopologyValue.TopicInfo topicInfo,
                                                                   final 
Map<String, Integer> decidedPartitionCountsForInternalTopics) {
         if (topicInfo.partitions() == 0 && 
!decidedPartitionCountsForInternalTopics.containsKey(topicInfo.name())) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index e83cb00af45..dcc2a4ca5f3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.errors.GroupMaxSizeReachedException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
 import org.apache.kafka.common.errors.InvalidRegularExpression;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -16014,6 +16015,67 @@ public class GroupMetadataManagerTest {
         assertEquals(100, result.response().data().memberEpoch());
     }
 
+    @Test
+    public void testStreamsOwnedTasksValidation() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String subtopologyMissing = "subtopologyMissing";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 3)
+                .build())
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+                    .build())
+                .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+                .withTargetAssignment(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+                .withTargetAssignmentEpoch(10)
+            )
+            .build();
+
+        InvalidRequestException e1 = 
assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatRequestData.TaskIds()
+                        .setSubtopologyId(subtopologyMissing)
+                        .setPartitions(List.of(0))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())));
+        assertEquals(e1.getMessage(), "Subtopology subtopologyMissing does not 
exist in the topology.");
+
+        InvalidRequestException e2 = 
assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatRequestData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(3))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())));
+        assertEquals(e2.getMessage(), "Task 3 for subtopology subtopology1 is 
invalid. Number of tasks for this subtopology: 3");
+    }
+
     @Test
     public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
         String groupId = "fooup";
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index b55b05d30d9..ad69879af32 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -713,7 +713,7 @@ public class TargetAssignmentBuilderTest {
             String subtopologyId = Uuid.randomUuid().toString();
             Uuid topicId = Uuid.randomUuid();
             topicsImageBuilder = topicsImageBuilder.addTopic(topicId, 
topicName, numTasks);
-            subtopologies.put(subtopologyId, new 
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), 
Map.of()));
+            subtopologies.put(subtopologyId, new 
ConfiguredSubtopology(numTasks, Set.of(topicId.toString()), Map.of(), Set.of(), 
Map.of()));
 
             return subtopologyId;
         }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
index a39914db1bc..fa047be2a8a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -82,11 +81,9 @@ class TopologyMetadataTest {
 
     @Test
     void testMaxNumInputPartitions() {
-        ConfiguredInternalTopic internalTopic = 
mock(ConfiguredInternalTopic.class);
         ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
         subtopologyMap.put("subtopology1", subtopology);
-        when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
-        
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
 internalTopic));
+        when(subtopology.numberOfTasks()).thenReturn(4);
 
         assertEquals(4, 
topologyMetadata.maxNumInputPartitions("subtopology1"));
     }
@@ -111,14 +108,4 @@ class TopologyMetadataTest {
     void 
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
         assertThrows(NoSuchElementException.class, () -> 
topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
     }
-
-    @Test
-    void 
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyContainsNoSourceTopics() 
{
-        ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
-        when(subtopology.sourceTopics()).thenReturn(Set.of());
-        when(subtopology.repartitionSourceTopics()).thenReturn(Map.of());
-        subtopologyMap.put("subtopology1", subtopology);
-
-        assertThrows(IllegalStateException.class, () -> 
topologyMetadata.maxNumInputPartitions("subtopology1"));
-    }
-}
\ No newline at end of file
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
index a7082248857..11280b9426b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
@@ -36,6 +36,7 @@ public class ConfiguredSubtopologyTest {
     public void testConstructorWithNullSourceTopics() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredSubtopology(
+                2,
                 null,
                 Map.of(),
                 Set.of(),
@@ -48,6 +49,7 @@ public class ConfiguredSubtopologyTest {
     public void testConstructorWithNullRepartitionSourceTopics() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredSubtopology(
+                2,
                 Set.of(),
                 null,
                 Set.of(),
@@ -60,6 +62,7 @@ public class ConfiguredSubtopologyTest {
     public void testConstructorWithNullRepartitionSinkTopics() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredSubtopology(
+                2,
                 Set.of(),
                 Map.of(),
                 null,
@@ -72,6 +75,7 @@ public class ConfiguredSubtopologyTest {
     public void testConstructorWithNullStateChangelogTopics() {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredSubtopology(
+                2,
                 Set.of(),
                 Map.of(),
                 Set.of(),
@@ -80,6 +84,19 @@ public class ConfiguredSubtopologyTest {
         );
     }
 
+    @Test
+    public void testConstructorWithNegativeTaskCount() {
+        assertThrows(IllegalArgumentException.class,
+            () -> new ConfiguredSubtopology(
+                -1,
+                Set.of(),
+                Map.of(),
+                Set.of(),
+                Map.of()
+            )
+        );
+    }
+
     @Test
     public void testAsStreamsGroupDescribeSubtopology() {
         String subtopologyId = "subtopology1";
@@ -91,7 +108,7 @@ public class ConfiguredSubtopologyTest {
         Map<String, ConfiguredInternalTopic> repartitionSourceTopics = 
Map.of("repartitionSourceTopic1", internalTopicMock);
         Map<String, ConfiguredInternalTopic> stateChangelogTopics = 
Map.of("stateChangelogTopic1", internalTopicMock);
         ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(
-            sourceTopics, repartitionSourceTopics, repartitionSinkTopics, 
stateChangelogTopics);
+            1, sourceTopics, repartitionSourceTopics, repartitionSinkTopics, 
stateChangelogTopics);
 
         StreamsGroupDescribeResponseData.Subtopology subtopology = 
configuredSubtopology.asStreamsGroupDescribeSubtopology(subtopologyId);
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
index cba28e0163d..eea0d0051f3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
@@ -65,9 +65,9 @@ class EndpointToPartitionsManagerTest {
         streamsGroup = mock(StreamsGroup.class);
         streamsGroupMember = mock(StreamsGroupMember.class);
         configuredTopology = mock(ConfiguredTopology.class);
-        configuredSubtopologyOne = new 
ConfiguredSubtopology(Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new 
HashMap<>());
+        configuredSubtopologyOne = new ConfiguredSubtopology(1, 
Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new HashMap<>());
         Map<String, ConfiguredInternalTopic> repartitionSourceTopics = 
Map.of("Topic-B",  new ConfiguredInternalTopic("Topic-B", 1, 
Optional.of((short) 1), Collections.emptyMap()));
-        configuredSubtopologyTwo = new ConfiguredSubtopology(new HashSet<>(), 
repartitionSourceTopics, new HashSet<>(), new HashMap<>());
+        configuredSubtopologyTwo = new ConfiguredSubtopology(1, new 
HashSet<>(), repartitionSourceTopics, new HashSet<>(), new HashMap<>());
         SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = 
new TreeMap<>();
         configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
         SortedMap<String, ConfiguredSubtopology> configuredSubtopologyTwoMap = 
new TreeMap<>();
@@ -128,7 +128,7 @@ class EndpointToPartitionsManagerTest {
             .addTopic(Uuid.randomUuid(), "Topic-A", topicAPartitions)
             .addTopic(Uuid.randomUuid(), "Topic-B", topicBPartitions)
             .build();
-        configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A", 
"Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
+        configuredSubtopologyOne = new 
ConfiguredSubtopology(Math.max(topicAPartitions, topicBPartitions), 
Set.of("Topic-A", "Topic-B"), new HashMap<>(), new HashSet<>(), new 
HashMap<>());
 
         activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
         when(streamsGroupMember.assignedTasks()).thenReturn(new 
TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
@@ -160,4 +160,4 @@ class EndpointToPartitionsManagerTest {
                 arguments(3, 3, List.of(0, 1, 2), List.of(0, 1, 2), "Should 
assign correct partitions when partitions same between topics")
         );
     }
-}
\ No newline at end of file
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index f3b40dc282b..07402ac8ebf 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -107,6 +107,7 @@ class InternalTopicManagerTest {
         return mkMap(
             mkEntry(SUBTOPOLOGY_1,
                 new ConfiguredSubtopology(
+                    2,
                     Set.of(SOURCE_TOPIC_1),
                     Map.of(),
                     Set.of(REPARTITION_TOPIC),
@@ -121,6 +122,7 @@ class InternalTopicManagerTest {
             ),
             mkEntry(SUBTOPOLOGY_2,
                 new ConfiguredSubtopology(
+                    2,
                     Set.of(SOURCE_TOPIC_2),
                     Map.of(REPARTITION_TOPIC,
                         new ConfiguredInternalTopic(REPARTITION_TOPIC,

Reply via email to