This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8b6094c4e328f04cfd92afa2c094c2dd94ef3904
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Oct 7 13:01:15 2024 +0200

    MINOR: Use subtopologyId and spelling consistently (#17370)
    
    Throughout RPCs, we were using subtopology instead of subtopologyId for
    the string ID of a subtopology. We were also using sub-topology
    spelling. Cleaning this up before merging the auto-topic creating code.
---
 .../StreamsGroupHeartbeatRequestManager.java       |  8 +++---
 .../message/StreamsGroupDescribeResponse.json      | 14 +++++-----
 .../message/StreamsGroupHeartbeatRequest.json      |  8 +++---
 .../message/StreamsGroupHeartbeatResponse.json     |  2 +-
 .../message/StreamsGroupInitializeRequest.json     |  8 +++---
 .../StreamsGroupHeartbeatRequestManagerTest.java   |  6 ++--
 .../coordinator/group/GroupMetadataManager.java    |  4 +--
 .../coordinator/group/streams/Assignment.java      |  6 ++--
 .../streams/CoordinatorStreamsRecordHelpers.java   | 10 +++----
 .../group/streams/CurrentAssignmentBuilder.java    |  8 +++---
 .../group/streams/StreamsGroupMember.java          |  4 +--
 .../coordinator/group/streams/StreamsTopology.java |  6 ++--
 .../StreamsGroupCurrentMemberAssignmentValue.json  |  2 +-
 .../StreamsGroupTargetAssignmentMemberValue.json   |  2 +-
 .../common/message/StreamsGroupTopologyValue.json  |  6 ++--
 .../coordinator/group/streams/AssignmentTest.java  | 12 ++++----
 .../CoordinatorStreamsRecordHelpersTest.java       |  2 +-
 .../streams/CurrentAssignmentBuilderTest.java      |  4 +--
 .../group/streams/StreamsGroupMemberTest.java      | 30 ++++++++++----------
 .../group/streams/StreamsTopologyTest.java         | 32 +++++++++++-----------
 .../group/streams/TargetAssignmentBuilderTest.java |  2 +-
 21 files changed, 88 insertions(+), 88 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index f41ed9d0998..3a80765206d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -271,8 +271,8 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
     private void setTargetAssignmentForConsumerGroup(final 
StreamsGroupHeartbeatResponseData data, final 
ConsumerGroupHeartbeatResponseData cgData) {
         Map<String, TopicPartitions> tps = new HashMap<>();
         data.activeTasks().forEach(taskId -> Stream.concat(
-                
streamsInterface.subtopologyMap().get(taskId.subtopology()).sourceTopics.stream(),
-                
streamsInterface.subtopologyMap().get(taskId.subtopology()).repartitionSourceTopics.keySet().stream()
+                
streamsInterface.subtopologyMap().get(taskId.subtopologyId()).sourceTopics.stream(),
+                
streamsInterface.subtopologyMap().get(taskId.subtopologyId()).repartitionSourceTopics.keySet().stream()
             )
             .forEach(topic -> {
                 final TopicPartitions toInsert = tps.computeIfAbsent(topic, k 
-> {
@@ -323,7 +323,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         target.clear();
         source.forEach(taskId -> {
             taskId.partitions().forEach(partition -> {
-                target.add(new 
StreamsAssignmentInterface.TaskId(taskId.subtopology(), partition));
+                target.add(new 
StreamsAssignmentInterface.TaskId(taskId.subtopologyId(), partition));
             });
         });
     }
@@ -602,7 +602,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
                 .stream()
                 .map(entry -> {
                     TaskIds ids = new TaskIds();
-                    ids.setSubtopology(entry.getKey());
+                    ids.setSubtopologyId(entry.getKey());
                     ids.setPartitions(entry.getValue());
                     return ids;
                 })
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 9cf580f0441..227ac495c2d 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -48,8 +48,8 @@
         { "name":  "Topology", "type": "[]Subtopology", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
           "about": "The sub-topologies of the streams application. Null if 
uninitialized.",
           "fields": [
-            { "name": "Subtopology", "type": "string", "versions": "0+",
-              "about": "String to uniquely identify the sub-topology." },
+            { "name": "SubtopologyId", "type": "string", "versions": "0+",
+              "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
             { "name": "SourceTopics", "type": "[]string", "versions": "0+",
               "about": "The topics the topology reads from." },
             { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
@@ -57,7 +57,7 @@
             { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
               "about": "The repartition topics the topology writes to." },
             { "name": "StateChangelogTopics", "type": "[]TopicInfo", 
"versions": "0+",
-              "about": "The set of state changelog topics associated with this 
sub-topology. Created automatically." },
+              "about": "The set of state changelog topics associated with this 
subtopology. Created automatically." },
             { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
               "about": "The set of source topics that are internally created 
repartition topics. Created automatically." }
           ]
@@ -103,8 +103,8 @@
   ],
   "commonStructs": [
     { "name": "TaskOffset", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
-        "about": "The sub-topology identifier." },
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
+        "about": "The subtopology identifier." },
       { "name": "Partition", "type": "int32", "versions": "0+",
         "about": "The partition." },
       { "name": "Offset", "type": "int64", "versions": "0+",
@@ -127,8 +127,8 @@
         "about": "Warm-up tasks for this client. " }
     ]},
     { "name": "TaskIds", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
-        "about": "The sub-topology identifier." },
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
+        "about": "The subtopology identifier." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partitions of the input topics processed by this 
member." }
     ]},
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index 7f829f1e410..ec28c2766fb 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -72,16 +72,16 @@
         "about": "value of the config" }
     ]},
     { "name": "TaskOffset", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
-        "about": "The sub-topology identifier." },
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
+        "about": "The subtopology identifier." },
       { "name": "Partition", "type": "int32", "versions": "0+",
         "about": "The partition." },
       { "name": "Offset", "type": "int64", "versions": "0+",
         "about": "The offset." }
     ]},
     { "name": "TaskIds", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
-        "about": "The sub-topology identifier." },
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
+        "about": "The subtopology identifier." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partitions of the input topics processed by this 
member." }
     ]}
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index ac11b3d8264..40e134dc407 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -96,7 +96,7 @@
         "about": "partitions" }
     ]},
     { "name": "TaskIds", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
         "about": "The subtopology identifier." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partitions of the input topics processed by this 
member." }
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
index 3223d1e8d9e..05c4709092c 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
@@ -29,15 +29,15 @@
       "about": "The sub-topologies of the streams application.",
       "fields": [
         { "name": "SubtopologyId", "type": "string", "versions": "0+",
-          "about": "String to uniquely identify the sub-topology. 
Deterministically generated from the topology" },
+          "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
         { "name": "SourceTopics", "type": "[]string", "versions": "0+",
           "about": "The topics the topology reads from." },
         { "name": "SourceTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-          "about": "The regular expressions identifying topics the 
sub-topology reads from." },
+          "about": "The regular expressions identifying topics the subtopology 
reads from." },
         { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": 
"0+",
-          "about": "The set of state changelog topics associated with this 
sub-topology. Created automatically." },
+          "about": "The set of state changelog topics associated with this 
subtopology. Created automatically." },
         { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
-          "about": "The repartition topics the sub-topology writes to." },
+          "about": "The repartition topics the subtopology writes to." },
         { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
           "about": "The set of source topics that are internally created 
repartition topics. Created automatically." }
       ]
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 503576d8fd5..a635d7d418e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -282,11 +282,11 @@ class StreamsGroupHeartbeatRequestManagerTest {
             .setThrottleTimeMs(TEST_THROTTLE_TIME_MS)
             .setHeartbeatIntervalMs(1000)
             .setActiveTasks(Collections.singletonList(
-                new 
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("0").setPartitions(Collections.singletonList(0))))
+                new 
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("0").setPartitions(Collections.singletonList(0))))
             .setStandbyTasks(Collections.singletonList(
-                new 
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("1").setPartitions(Collections.singletonList(1))))
+                new 
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("1").setPartitions(Collections.singletonList(1))))
             .setWarmupTasks(Collections.singletonList(
-                new 
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("2").setPartitions(Collections.singletonList(2))));
+                new 
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("2").setPartitions(Collections.singletonList(2))));
 
         mockResponse(data);
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index dfb768f9fb5..80785af9c76 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1778,7 +1778,7 @@ public class GroupMetadataManager {
         if (ownedTasks == null) return false;
 
         for (StreamsGroupHeartbeatRequestData.TaskIds topicPartitions : 
ownedTasks) {
-            Set<Integer> partitions = 
target.get(topicPartitions.subtopology());
+            Set<Integer> partitions = 
target.get(topicPartitions.subtopologyId());
             if (partitions == null) return false;
             for (Integer partitionId : topicPartitions.partitions()) {
                 if (!partitions.contains(partitionId)) return false;
@@ -2438,7 +2438,7 @@ public class GroupMetadataManager {
     private List<StreamsGroupHeartbeatResponseData.TaskIds> 
createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> 
taskIds) {
         return taskIds.entrySet().stream()
             .map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds()
-                .setSubtopology(entry.getKey())
+                .setSubtopologyId(entry.getKey())
                 .setPartitions(new ArrayList<>(entry.getValue())))
             .collect(Collectors.toList());
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
index b80e34cbfe5..09a83323072 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
@@ -108,19 +108,19 @@ public class Assignment {
         return new Assignment(
             record.activeTasks().stream()
                 .collect(Collectors.toMap(
-                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
                         taskId -> new HashSet<>(taskId.partitions())
                     )
                 ),
             record.standbyTasks().stream()
                 .collect(Collectors.toMap(
-                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
                         taskId -> new HashSet<>(taskId.partitions())
                     )
                 ),
             record.warmupTasks().stream()
                 .collect(Collectors.toMap(
-                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
                         taskId -> new HashSet<>(taskId.partitions())
                     )
                 )
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
index 48a31fdd4c7..7a7ee7f2ae2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
@@ -213,7 +213,7 @@ public class CoordinatorStreamsRecordHelpers {
         for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) {
             activeTaskIds.add(
                 new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-                    .setSubtopology(entry.getKey())
+                    .setSubtopologyId(entry.getKey())
                     .setPartitions(new ArrayList<>(entry.getValue()))
             );
         }
@@ -221,7 +221,7 @@ public class CoordinatorStreamsRecordHelpers {
         for (Map.Entry<String, Set<Integer>> entry : standbyTasks.entrySet()) {
             standbyTaskIds.add(
                 new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-                    .setSubtopology(entry.getKey())
+                    .setSubtopologyId(entry.getKey())
                     .setPartitions(new ArrayList<>(entry.getValue()))
             );
         }
@@ -229,7 +229,7 @@ public class CoordinatorStreamsRecordHelpers {
         for (Map.Entry<String, Set<Integer>> entry : warmupTasks.entrySet()) {
             warmupTaskIds.add(
                 new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-                    .setSubtopology(entry.getKey())
+                    .setSubtopologyId(entry.getKey())
                     .setPartitions(new ArrayList<>(entry.getValue()))
             );
         }
@@ -366,7 +366,7 @@ public class CoordinatorStreamsRecordHelpers {
         List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new 
ArrayList<>(tasks.size());
         tasks.forEach((subtopologyId, partitions) ->
             taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
-                .setSubtopology(subtopologyId)
+                .setSubtopologyId(subtopologyId)
                 .setPartitions(new ArrayList<>(partitions)))
         );
         return taskIds;
@@ -399,7 +399,7 @@ public class CoordinatorStreamsRecordHelpers {
                 return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
             }).collect(Collectors.toList());
 
-            value.topology().add(new 
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId())
+            value.topology().add(new 
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
                 
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
                 
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
         });
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
index 24e8a2f2a6c..974f465a740 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
@@ -168,17 +168,17 @@ public class CurrentAssignmentBuilder {
         } else {
             this.ownedActiveTasks = 
ownedAssignment.activeTasks().entrySet().stream()
                 .map(activeTaskIds -> new 
StreamsGroupHeartbeatRequestData.TaskIds()
-                    .setSubtopology(activeTaskIds.getKey())
+                    .setSubtopologyId(activeTaskIds.getKey())
                     .setPartitions(new ArrayList<>(activeTaskIds.getValue())))
                 .collect(Collectors.toList());
             this.ownedStandbyTasks = 
ownedAssignment.standbyTasks().entrySet().stream()
                 .map(standbyTaskIds -> new 
StreamsGroupHeartbeatRequestData.TaskIds()
-                    .setSubtopology(standbyTaskIds.getKey())
+                    .setSubtopologyId(standbyTaskIds.getKey())
                     .setPartitions(new ArrayList<>(standbyTaskIds.getValue())))
                 .collect(Collectors.toList());
             this.ownedWarmupTasks = 
ownedAssignment.warmupTasks().entrySet().stream()
                 .map(warmupTaskIds -> new 
StreamsGroupHeartbeatRequestData.TaskIds()
-                    .setSubtopology(warmupTaskIds.getKey())
+                    .setSubtopologyId(warmupTaskIds.getKey())
                     .setPartitions(new ArrayList<>(warmupTaskIds.getValue())))
                 .collect(Collectors.toList());
         }
@@ -329,7 +329,7 @@ public class CurrentAssignmentBuilder {
 
         for (StreamsGroupHeartbeatRequestData.TaskIds owned : ownedTasks) {
             Set<Integer> tasksPendingRevocation =
-                assignment.getOrDefault(owned.subtopology(), 
Collections.emptySet());
+                assignment.getOrDefault(owned.subtopologyId(), 
Collections.emptySet());
 
             for (Integer partitionId : owned.partitions()) {
                 if (tasksPendingRevocation.contains(partitionId)) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 7caa1f77ff0..12d97d32e62 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -277,7 +277,7 @@ public class StreamsGroupMember {
             List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
topicPartitionsList
         ) {
             return topicPartitionsList.stream().collect(Collectors.toMap(
-                StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopology,
+                
StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId,
                 taskIds -> Collections.unmodifiableSet(new 
HashSet<>(taskIds.partitions()))));
         }
 
@@ -632,7 +632,7 @@ public class StreamsGroupMember {
         List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new 
ArrayList<>();
         tasks.forEach((subtopologyId, partitionSet) -> {
             taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
-                .setSubtopology(subtopologyId)
+                .setSubtopologyId(subtopologyId)
                 .setPartitions(new ArrayList<>(partitionSet)));
         });
         return taskIds;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index ca2fd9a6da0..16b8f6d5178 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -68,7 +68,7 @@ public class StreamsTopology {
 
             this.subtopologies.put(
                 subtopology.subtopologyId(),
-                new 
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId())
+                new 
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
                     
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
                     
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
         });
@@ -92,7 +92,7 @@ public class StreamsTopology {
     public static StreamsTopology fromRecord(StreamsGroupTopologyValue record) 
{
         return new StreamsTopology(
             record.topologyId(),
-            
record.topology().stream().collect(Collectors.toMap(Subtopology::subtopology, x 
-> x))
+            
record.topology().stream().collect(Collectors.toMap(Subtopology::subtopologyId, 
x -> x))
         );
     }
 
@@ -125,7 +125,7 @@ public class StreamsTopology {
         return subtopologies.values().stream().map(
             subtopology -> new StreamsGroupDescribeResponseData.Subtopology()
                 .setSourceTopicRegex(subtopology.sourceTopicRegex())
-                .setSubtopology(subtopology.subtopology())
+                .setSubtopologyId(subtopology.subtopologyId())
                 .setSourceTopics(subtopology.sourceTopics())
                 .setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
                 .setRepartitionSourceTopics(
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
index bcb0e6b6eed..8d393de7b6c 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
@@ -41,7 +41,7 @@
   ],
   "commonStructs": [
     { "name": "TaskIds", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
         "about": "The subtopology identifier." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partitions of the input topics processed by this 
member." }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
index f33e0ace34c..88424950082 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
@@ -29,7 +29,7 @@
   ],
   "commonStructs": [
     { "name": "TaskIds", "versions": "0+", "fields": [
-      { "name": "Subtopology", "type": "string", "versions": "0+",
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
         "about": "The subtopology identifier." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partitions of the input topics processed by this 
member." }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
index 5ba633095b6..af1fae06aad 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
@@ -25,8 +25,8 @@
     { "name":  "Topology", "type": "[]Subtopology", "versions": "0+",
       "about": "The sub-topologies of the streams application.",
       "fields": [
-        { "name": "Subtopology", "type": "string", "versions": "0+",
-          "about": "String to uniquely identify the subtopology." },
+        { "name": "SubtopologyId", "type": "string", "versions": "0+",
+          "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
         { "name": "SourceTopics", "type": "[]string", "versions": "0+",
           "about": "The topics the topology reads from." },
         { "name": "SourceTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
@@ -34,7 +34,7 @@
         { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": 
"0+",
           "about": "The set of state changelog topics associated with this 
subtopology. " },
         { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
-          "about": "The repartition topics the sub-topology writes to." },
+          "about": "The repartition topics the subtopology writes to." },
         { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
           "about": "The set of source topics that are internally created 
repartition topics. " }
       ]
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
index 57a48e2d365..65675136eb7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
@@ -64,24 +64,24 @@ public class AssignmentTest {
         String subtopology2 = "subtopology2";
         List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks = 
new ArrayList<>();
         activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-            .setSubtopology(subtopology1)
+            .setSubtopologyId(subtopology1)
             .setPartitions(Arrays.asList(1, 2, 3)));
         activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-            .setSubtopology(subtopology2)
+            .setSubtopologyId(subtopology2)
             .setPartitions(Arrays.asList(4, 5, 6)));
         List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks = 
new ArrayList<>();
         standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-            .setSubtopology(subtopology1)
+            .setSubtopologyId(subtopology1)
             .setPartitions(Arrays.asList(7, 8, 9)));
         standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-            .setSubtopology(subtopology2)
+            .setSubtopologyId(subtopology2)
             .setPartitions(Arrays.asList(1, 2, 3)));
         List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks = 
new ArrayList<>();
         warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-            .setSubtopology(subtopology1)
+            .setSubtopologyId(subtopology1)
             .setPartitions(Arrays.asList(4, 5, 6)));
         warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
-            .setSubtopology(subtopology2)
+            .setSubtopologyId(subtopology2)
             .setPartitions(Arrays.asList(7, 8, 9)));
 
         StreamsGroupTargetAssignmentMemberValue record = new 
StreamsGroupTargetAssignmentMemberValue()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
index 0b291bd098e..839897d4dae 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
@@ -64,7 +64,7 @@ class CoordinatorStreamsRecordHelpersTest {
 
         List<StreamsGroupTopologyValue.Subtopology> expectedTopology =
             Collections.singletonList(new 
StreamsGroupTopologyValue.Subtopology()
-                .setSubtopology("subtopology-id")
+                .setSubtopologyId("subtopology-id")
                 .setRepartitionSinkTopics(Collections.singletonList("foo"))
                 .setSourceTopics(Collections.singletonList("bar"))
                 .setRepartitionSourceTopics(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
index 93e40214dd8..d6f83025c7f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
@@ -418,10 +418,10 @@ public class CurrentAssignmentBuilderTest {
             .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
             .withOwnedActiveTasks(Arrays.asList(
                 new StreamsGroupHeartbeatRequestData.TaskIds()
-                    .setSubtopology(subtopologyId1)
+                    .setSubtopologyId(subtopologyId1)
                     .setPartitions(Arrays.asList(2, 3)),
                 new StreamsGroupHeartbeatRequestData.TaskIds()
-                    .setSubtopology(subtopologyId2)
+                    .setSubtopologyId(subtopologyId2)
                     .setPartitions(Arrays.asList(5, 6))))
             .withOwnedStandbyTasks(Collections.emptyList())
             .withOwnedWarmupTasks(Collections.emptyList())
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 0e5a0ffef07..00f6ef6c676 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -248,27 +248,27 @@ public class StreamsGroupMemberTest {
             .setPreviousMemberEpoch(9)
             .setState((byte) 1)
             .setActiveTasks(Collections.singletonList(new TaskIds()
-                .setSubtopology(subtopologyId1)
+                .setSubtopologyId(subtopologyId1)
                 .setPartitions(Arrays.asList(1, 2)))
             )
             .setStandbyTasks(Collections.singletonList(new TaskIds()
-                .setSubtopology(subtopologyId2)
+                .setSubtopologyId(subtopologyId2)
                 .setPartitions(Arrays.asList(3, 4)))
             )
             .setWarmupTasks(Collections.singletonList(new TaskIds()
-                .setSubtopology(subtopologyId1)
+                .setSubtopologyId(subtopologyId1)
                 .setPartitions(Arrays.asList(5, 6)))
             )
             .setActiveTasksPendingRevocation(Collections.singletonList(new 
TaskIds()
-                .setSubtopology(subtopologyId2)
+                .setSubtopologyId(subtopologyId2)
                 .setPartitions(Arrays.asList(7, 8)))
             )
             .setStandbyTasksPendingRevocation(Collections.singletonList(new 
TaskIds()
-                .setSubtopology(subtopologyId1)
+                .setSubtopologyId(subtopologyId1)
                 .setPartitions(Arrays.asList(9, 10)))
             )
             .setWarmupTasksPendingRevocation(Collections.singletonList(new 
TaskIds()
-                .setSubtopology(subtopologyId2)
+                .setSubtopologyId(subtopologyId2)
                 .setPartitions(Arrays.asList(11, 12)))
             );
 
@@ -300,13 +300,13 @@ public class StreamsGroupMemberTest {
             .setMemberEpoch(epoch)
             .setPreviousMemberEpoch(epoch - 1)
             .setActiveTasks(Collections.singletonList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
-                .setSubtopology(subTopology1)
+                .setSubtopologyId(subTopology1)
                 .setPartitions(assignedTasks1)))
             .setStandbyTasks(Collections.singletonList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
-                .setSubtopology(subTopology2)
+                .setSubtopologyId(subTopology2)
                 .setPartitions(assignedTasks2)))
             .setWarmupTasks(Collections.singletonList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
-                .setSubtopology(subTopology3)
+                .setSubtopologyId(subTopology3)
                 .setPartitions(assignedTasks3)));
         String memberId = Uuid.randomUuid().toString();
         String clientId = "clientId";
@@ -355,25 +355,25 @@ public class StreamsGroupMemberTest {
             .setAssignment(
                 new StreamsGroupDescribeResponseData.Assignment()
                     .setActiveTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
-                        .setSubtopology(subTopology1)
+                        .setSubtopologyId(subTopology1)
                         .setPartitions(assignedTasks1)))
                     .setStandbyTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
-                        .setSubtopology(subTopology2)
+                        .setSubtopologyId(subTopology2)
                         .setPartitions(assignedTasks2)))
                     .setWarmupTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
-                        .setSubtopology(subTopology3)
+                        .setSubtopologyId(subTopology3)
                         .setPartitions(assignedTasks3)))
             )
             .setTargetAssignment(
                 new StreamsGroupDescribeResponseData.Assignment()
                     .setActiveTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
-                        .setSubtopology(subTopology1)
+                        .setSubtopologyId(subTopology1)
                         .setPartitions(assignedTasks3)))
                     .setStandbyTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
-                        .setSubtopology(subTopology2)
+                        .setSubtopologyId(subTopology2)
                         .setPartitions(assignedTasks2)))
                     .setWarmupTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
-                        .setSubtopology(subTopology3)
+                        .setSubtopologyId(subTopology3)
                         .setPartitions(assignedTasks1)))
             );
         // TODO: Add TaskOffsets
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
index 7ef086c9be5..8cb9c9aa645 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -45,8 +45,8 @@ public class StreamsTopologyTest {
     @Test
     public void subtopologiesShouldBeCorrect() {
         Map<String, Subtopology> subtopologies = mkMap(
-            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
-            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopologyId("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopologyId("subtopology-2"))
         );
         StreamsTopology topology = new StreamsTopology("topology-id", 
subtopologies);
         assertEquals(subtopologies, topology.subtopologies());
@@ -83,8 +83,8 @@ public class StreamsTopologyTest {
         StreamsGroupTopologyValue record = new StreamsGroupTopologyValue()
             .setTopologyId("topology-id")
             .setTopology(Arrays.asList(
-                new Subtopology().setSubtopology("subtopology-1"),
-                new Subtopology().setSubtopology("subtopology-2")
+                new Subtopology().setSubtopologyId("subtopology-1"),
+                new Subtopology().setSubtopologyId("subtopology-2")
             ));
         StreamsTopology topology = StreamsTopology.fromRecord(record);
         assertEquals("topology-id", topology.topologyId());
@@ -96,8 +96,8 @@ public class StreamsTopologyTest {
     @Test
     public void equalsShouldReturnTrueForEqualTopologies() {
         Map<String, Subtopology> subtopologies = mkMap(
-            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
-            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopologyId("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopologyId("subtopology-2"))
         );
         StreamsTopology topology1 = new StreamsTopology("topology-id", 
subtopologies);
         StreamsTopology topology2 = new StreamsTopology("topology-id", 
subtopologies);
@@ -107,10 +107,10 @@ public class StreamsTopologyTest {
     @Test
     public void equalsShouldReturnFalseForDifferentTopologies() {
         Map<String, Subtopology> subtopologies1 = mkMap(
-            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1"))
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopologyId("subtopology-1"))
         );
         Map<String, Subtopology> subtopologies2 = mkMap(
-            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopologyId("subtopology-2"))
         );
         StreamsTopology topology1 = new StreamsTopology("topology-id-1", 
subtopologies1);
         StreamsTopology topology2 = new StreamsTopology("topology-id-2", 
subtopologies2);
@@ -120,8 +120,8 @@ public class StreamsTopologyTest {
     @Test
     public void hashCodeShouldBeConsistentWithEquals() {
         Map<String, Subtopology> subtopologies = mkMap(
-            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
-            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopologyId("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopologyId("subtopology-2"))
         );
         StreamsTopology topology1 = new StreamsTopology("topology-id", 
subtopologies);
         StreamsTopology topology2 = new StreamsTopology("topology-id", 
subtopologies);
@@ -131,8 +131,8 @@ public class StreamsTopologyTest {
     @Test
     public void toStringShouldReturnCorrectRepresentation() {
         Map<String, Subtopology> subtopologies = mkMap(
-            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
-            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopologyId("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopologyId("subtopology-2"))
         );
         StreamsTopology topology = new StreamsTopology("topology-id", 
subtopologies);
         String expectedString = "StreamsTopology{topologyId=topology-id, 
subtopologies=" + subtopologies + "}";
@@ -144,7 +144,7 @@ public class StreamsTopologyTest {
         Map<String, Subtopology> subtopologies = mkMap(
             mkEntry("subtopology-1", new Subtopology()
                 .setSourceTopicRegex("regex-1")
-                .setSubtopology("subtopology-1")
+                .setSubtopologyId("subtopology-1")
                 .setSourceTopics(Collections.singletonList("source-topic-1"))
                 
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-1"))
                 .setRepartitionSourceTopics(
@@ -154,7 +154,7 @@ public class StreamsTopologyTest {
             ),
             mkEntry("subtopology-2", new Subtopology()
                 .setSourceTopicRegex("regex-2")
-                .setSubtopology("subtopology-2")
+                .setSubtopologyId("subtopology-2")
                 .setSourceTopics(Collections.singletonList("source-topic-2"))
                 
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-2"))
                 .setRepartitionSourceTopics(
@@ -167,13 +167,13 @@ public class StreamsTopologyTest {
         List<StreamsGroupDescribeResponseData.Subtopology> result = 
topology.asStreamsGroupDescribeTopology();
         assertEquals(2, result.size());
         assertEquals("regex-1", result.get(0).sourceTopicRegex());
-        assertEquals("subtopology-1", result.get(0).subtopology());
+        assertEquals("subtopology-1", result.get(0).subtopologyId());
         assertEquals(Collections.singletonList("source-topic-1"), 
result.get(0).sourceTopics());
         assertEquals(Collections.singletonList("sink-topic-1"), 
result.get(0).repartitionSinkTopics());
         assertEquals("repartition-topic-1", 
result.get(0).repartitionSourceTopics().get(0).name());
         assertEquals("changelog-topic-1", 
result.get(0).stateChangelogTopics().get(0).name());
         assertEquals("regex-2", result.get(1).sourceTopicRegex());
-        assertEquals("subtopology-2", result.get(1).subtopology());
+        assertEquals("subtopology-2", result.get(1).subtopologyId());
         assertEquals(Collections.singletonList("source-topic-2"), 
result.get(1).sourceTopics());
         assertEquals(Collections.singletonList("sink-topic-2"), 
result.get(1).repartitionSinkTopics());
         assertEquals("repartition-topic-2", 
result.get(1).repartitionSourceTopics().get(0).name());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 1679336815f..e7b52deae42 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -117,7 +117,7 @@ public class TargetAssignmentBuilderTest {
             ));
             topicsImageBuilder = topicsImageBuilder.addTopic(topicId, 
topicName, numTasks);
             topology.subtopologies().put(subtopologyId, new 
StreamsGroupTopologyValue.Subtopology()
-                .setSubtopology(subtopologyId)
+                .setSubtopologyId(subtopologyId)
                 
.setSourceTopics(Collections.singletonList(topicId.toString())));
 
             return subtopologyId;


Reply via email to