This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 8b6094c4e328f04cfd92afa2c094c2dd94ef3904 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon Oct 7 13:01:15 2024 +0200 MINOR: Use subtopologyId and spelling consistently (#17370) Throughout RPCs, we were using subtopology instead of subtopologyId for the string ID of a subtopology. We were also using sub-topology spelling. Cleaning this up before merging the auto-topic creating code. --- .../StreamsGroupHeartbeatRequestManager.java | 8 +++--- .../message/StreamsGroupDescribeResponse.json | 14 +++++----- .../message/StreamsGroupHeartbeatRequest.json | 8 +++--- .../message/StreamsGroupHeartbeatResponse.json | 2 +- .../message/StreamsGroupInitializeRequest.json | 8 +++--- .../StreamsGroupHeartbeatRequestManagerTest.java | 6 ++-- .../coordinator/group/GroupMetadataManager.java | 4 +-- .../coordinator/group/streams/Assignment.java | 6 ++-- .../streams/CoordinatorStreamsRecordHelpers.java | 10 +++---- .../group/streams/CurrentAssignmentBuilder.java | 8 +++--- .../group/streams/StreamsGroupMember.java | 4 +-- .../coordinator/group/streams/StreamsTopology.java | 6 ++-- .../StreamsGroupCurrentMemberAssignmentValue.json | 2 +- .../StreamsGroupTargetAssignmentMemberValue.json | 2 +- .../common/message/StreamsGroupTopologyValue.json | 6 ++-- .../coordinator/group/streams/AssignmentTest.java | 12 ++++---- .../CoordinatorStreamsRecordHelpersTest.java | 2 +- .../streams/CurrentAssignmentBuilderTest.java | 4 +-- .../group/streams/StreamsGroupMemberTest.java | 30 ++++++++++---------- .../group/streams/StreamsTopologyTest.java | 32 +++++++++++----------- .../group/streams/TargetAssignmentBuilderTest.java | 2 +- 21 files changed, 88 insertions(+), 88 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index f41ed9d0998..3a80765206d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -271,8 +271,8 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { private void setTargetAssignmentForConsumerGroup(final StreamsGroupHeartbeatResponseData data, final ConsumerGroupHeartbeatResponseData cgData) { Map<String, TopicPartitions> tps = new HashMap<>(); data.activeTasks().forEach(taskId -> Stream.concat( - streamsInterface.subtopologyMap().get(taskId.subtopology()).sourceTopics.stream(), - streamsInterface.subtopologyMap().get(taskId.subtopology()).repartitionSourceTopics.keySet().stream() + streamsInterface.subtopologyMap().get(taskId.subtopologyId()).sourceTopics.stream(), + streamsInterface.subtopologyMap().get(taskId.subtopologyId()).repartitionSourceTopics.keySet().stream() ) .forEach(topic -> { final TopicPartitions toInsert = tps.computeIfAbsent(topic, k -> { @@ -323,7 +323,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { target.clear(); source.forEach(taskId -> { taskId.partitions().forEach(partition -> { - target.add(new StreamsAssignmentInterface.TaskId(taskId.subtopology(), partition)); + target.add(new StreamsAssignmentInterface.TaskId(taskId.subtopologyId(), partition)); }); }); } @@ -602,7 +602,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager { .stream() .map(entry -> { TaskIds ids = new TaskIds(); - ids.setSubtopology(entry.getKey()); + ids.setSubtopologyId(entry.getKey()); ids.setPartitions(entry.getValue()); return ids; }) diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index 9cf580f0441..227ac495c2d 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -48,8 +48,8 @@ { "name": "Topology", "type": "[]Subtopology", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The sub-topologies of the streams application. Null if uninitialized.", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "String to uniquely identify the sub-topology." }, + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "String to uniquely identify the subtopology. Deterministically generated from the topology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, { "name": "SourceTopicRegex", "type": "string", "versions": "0+", @@ -57,7 +57,7 @@ { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", "about": "The repartition topics the topology writes to." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of state changelog topics associated with this sub-topology. Created automatically." }, + "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of source topics that are internally created repartition topics. Created automatically." } ] @@ -103,8 +103,8 @@ ], "commonStructs": [ { "name": "TaskOffset", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "The sub-topology identifier." }, + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition." }, { "name": "Offset", "type": "int64", "versions": "0+", @@ -127,8 +127,8 @@ "about": "Warm-up tasks for this client. " } ]}, { "name": "TaskIds", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "The sub-topology identifier." }, + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } ]}, diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json index 7f829f1e410..ec28c2766fb 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json @@ -72,16 +72,16 @@ "about": "value of the config" } ]}, { "name": "TaskOffset", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "The sub-topology identifier." }, + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition." }, { "name": "Offset", "type": "int64", "versions": "0+", "about": "The offset." } ]}, { "name": "TaskIds", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "The sub-topology identifier." }, + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } ]} diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index ac11b3d8264..40e134dc407 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -96,7 +96,7 @@ "about": "partitions" } ]}, { "name": "TaskIds", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", + { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } diff --git a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json index 3223d1e8d9e..05c4709092c 100644 --- a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json @@ -29,15 +29,15 @@ "about": "The sub-topologies of the streams application.", "fields": [ { "name": "SubtopologyId", "type": "string", "versions": "0+", - "about": "String to uniquely identify the sub-topology. Deterministically generated from the topology" }, + "about": "String to uniquely identify the subtopology. Deterministically generated from the topology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The regular expressions identifying topics the sub-topology reads from." }, + "about": "The regular expressions identifying topics the subtopology reads from." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of state changelog topics associated with this sub-topology. Created automatically." }, + "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", - "about": "The repartition topics the sub-topology writes to." }, + "about": "The repartition topics the subtopology writes to." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of source topics that are internally created repartition topics. Created automatically." } ] diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 503576d8fd5..a635d7d418e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -282,11 +282,11 @@ class StreamsGroupHeartbeatRequestManagerTest { .setThrottleTimeMs(TEST_THROTTLE_TIME_MS) .setHeartbeatIntervalMs(1000) .setActiveTasks(Collections.singletonList( - new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("0").setPartitions(Collections.singletonList(0)))) + new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("0").setPartitions(Collections.singletonList(0)))) .setStandbyTasks(Collections.singletonList( - new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("1").setPartitions(Collections.singletonList(1)))) + new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("1").setPartitions(Collections.singletonList(1)))) .setWarmupTasks(Collections.singletonList( - new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("2").setPartitions(Collections.singletonList(2)))); + new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("2").setPartitions(Collections.singletonList(2)))); mockResponse(data); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index dfb768f9fb5..80785af9c76 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1778,7 +1778,7 @@ public class GroupMetadataManager { if (ownedTasks == null) return false; for (StreamsGroupHeartbeatRequestData.TaskIds topicPartitions : ownedTasks) { - Set<Integer> partitions = target.get(topicPartitions.subtopology()); + Set<Integer> partitions = target.get(topicPartitions.subtopologyId()); if (partitions == null) return false; for (Integer partitionId : topicPartitions.partitions()) { if (!partitions.contains(partitionId)) return false; @@ -2438,7 +2438,7 @@ public class GroupMetadataManager { private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>> taskIds) { return taskIds.entrySet().stream() .map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds() - .setSubtopology(entry.getKey()) + .setSubtopologyId(entry.getKey()) .setPartitions(new ArrayList<>(entry.getValue()))) .collect(Collectors.toList()); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java index b80e34cbfe5..09a83323072 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java @@ -108,19 +108,19 @@ public class Assignment { return new Assignment( record.activeTasks().stream() .collect(Collectors.toMap( - StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId, taskId -> new HashSet<>(taskId.partitions()) ) ), record.standbyTasks().stream() .collect(Collectors.toMap( - StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId, taskId -> new HashSet<>(taskId.partitions()) ) ), record.warmupTasks().stream() .collect(Collectors.toMap( - StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology, + StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId, taskId -> new HashSet<>(taskId.partitions()) ) ) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java index 48a31fdd4c7..7a7ee7f2ae2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java @@ -213,7 +213,7 @@ public class CoordinatorStreamsRecordHelpers { for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) { activeTaskIds.add( new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(entry.getKey()) + .setSubtopologyId(entry.getKey()) .setPartitions(new ArrayList<>(entry.getValue())) ); } @@ -221,7 +221,7 @@ public class CoordinatorStreamsRecordHelpers { for (Map.Entry<String, Set<Integer>> entry : standbyTasks.entrySet()) { standbyTaskIds.add( new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(entry.getKey()) + .setSubtopologyId(entry.getKey()) .setPartitions(new ArrayList<>(entry.getValue())) ); } @@ -229,7 +229,7 @@ public class CoordinatorStreamsRecordHelpers { for (Map.Entry<String, Set<Integer>> entry : warmupTasks.entrySet()) { warmupTaskIds.add( new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(entry.getKey()) + .setSubtopologyId(entry.getKey()) .setPartitions(new ArrayList<>(entry.getValue())) ); } @@ -366,7 +366,7 @@ public class CoordinatorStreamsRecordHelpers { List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasks.size()); tasks.forEach((subtopologyId, partitions) -> taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() - .setSubtopology(subtopologyId) + .setSubtopologyId(subtopologyId) .setPartitions(new ArrayList<>(partitions))) ); return taskIds; @@ -399,7 +399,7 @@ public class CoordinatorStreamsRecordHelpers { return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs); }).collect(Collectors.toList()); - value.topology().add(new StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId()) + value.topology().add(new StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId()) .setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); }); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java index 24e8a2f2a6c..974f465a740 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java @@ -168,17 +168,17 @@ public class CurrentAssignmentBuilder { } else { this.ownedActiveTasks = ownedAssignment.activeTasks().entrySet().stream() .map(activeTaskIds -> new StreamsGroupHeartbeatRequestData.TaskIds() - .setSubtopology(activeTaskIds.getKey()) + .setSubtopologyId(activeTaskIds.getKey()) .setPartitions(new ArrayList<>(activeTaskIds.getValue()))) .collect(Collectors.toList()); this.ownedStandbyTasks = ownedAssignment.standbyTasks().entrySet().stream() .map(standbyTaskIds -> new StreamsGroupHeartbeatRequestData.TaskIds() - .setSubtopology(standbyTaskIds.getKey()) + .setSubtopologyId(standbyTaskIds.getKey()) .setPartitions(new ArrayList<>(standbyTaskIds.getValue()))) .collect(Collectors.toList()); this.ownedWarmupTasks = ownedAssignment.warmupTasks().entrySet().stream() .map(warmupTaskIds -> new StreamsGroupHeartbeatRequestData.TaskIds() - .setSubtopology(warmupTaskIds.getKey()) + .setSubtopologyId(warmupTaskIds.getKey()) .setPartitions(new ArrayList<>(warmupTaskIds.getValue()))) .collect(Collectors.toList()); } @@ -329,7 +329,7 @@ public class CurrentAssignmentBuilder { for (StreamsGroupHeartbeatRequestData.TaskIds owned : ownedTasks) { Set<Integer> tasksPendingRevocation = - assignment.getOrDefault(owned.subtopology(), Collections.emptySet()); + assignment.getOrDefault(owned.subtopologyId(), Collections.emptySet()); for (Integer partitionId : owned.partitions()) { if (tasksPendingRevocation.contains(partitionId)) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java index 7caa1f77ff0..12d97d32e62 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java @@ -277,7 +277,7 @@ public class StreamsGroupMember { List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> topicPartitionsList ) { return topicPartitionsList.stream().collect(Collectors.toMap( - StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopology, + StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId, taskIds -> Collections.unmodifiableSet(new HashSet<>(taskIds.partitions())))); } @@ -632,7 +632,7 @@ public class StreamsGroupMember { List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>(); tasks.forEach((subtopologyId, partitionSet) -> { taskIds.add(new StreamsGroupDescribeResponseData.TaskIds() - .setSubtopology(subtopologyId) + .setSubtopologyId(subtopologyId) .setPartitions(new ArrayList<>(partitionSet))); }); return taskIds; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java index ca2fd9a6da0..16b8f6d5178 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java @@ -68,7 +68,7 @@ public class StreamsTopology { this.subtopologies.put( subtopology.subtopologyId(), - new StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId()) + new StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId()) .setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); }); @@ -92,7 +92,7 @@ public class StreamsTopology { public static StreamsTopology fromRecord(StreamsGroupTopologyValue record) { return new StreamsTopology( record.topologyId(), - record.topology().stream().collect(Collectors.toMap(Subtopology::subtopology, x -> x)) + record.topology().stream().collect(Collectors.toMap(Subtopology::subtopologyId, x -> x)) ); } @@ -125,7 +125,7 @@ public class StreamsTopology { return subtopologies.values().stream().map( subtopology -> new StreamsGroupDescribeResponseData.Subtopology() .setSourceTopicRegex(subtopology.sourceTopicRegex()) - .setSubtopology(subtopology.subtopology()) + .setSubtopologyId(subtopology.subtopologyId()) .setSourceTopics(subtopology.sourceTopics()) .setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) .setRepartitionSourceTopics( diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json index bcb0e6b6eed..8d393de7b6c 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json @@ -41,7 +41,7 @@ ], "commonStructs": [ { "name": "TaskIds", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", + { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json index f33e0ace34c..88424950082 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json @@ -29,7 +29,7 @@ ], "commonStructs": [ { "name": "TaskIds", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", + { "name": "SubtopologyId", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions of the input topics processed by this member." } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json index 5ba633095b6..af1fae06aad 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json @@ -25,8 +25,8 @@ { "name": "Topology", "type": "[]Subtopology", "versions": "0+", "about": "The sub-topologies of the streams application.", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "String to uniquely identify the subtopology." }, + { "name": "SubtopologyId", "type": "string", "versions": "0+", + "about": "String to uniquely identify the subtopology. Deterministically generated from the topology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", @@ -34,7 +34,7 @@ { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of state changelog topics associated with this subtopology. " }, { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", - "about": "The repartition topics the sub-topology writes to." }, + "about": "The repartition topics the subtopology writes to." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of source topics that are internally created repartition topics. " } ] diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java index 57a48e2d365..65675136eb7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java @@ -64,24 +64,24 @@ public class AssignmentTest { String subtopology2 = "subtopology2"; List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks = new ArrayList<>(); activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(subtopology1) + .setSubtopologyId(subtopology1) .setPartitions(Arrays.asList(1, 2, 3))); activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(subtopology2) + .setSubtopologyId(subtopology2) .setPartitions(Arrays.asList(4, 5, 6))); List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks = new ArrayList<>(); standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(subtopology1) + .setSubtopologyId(subtopology1) .setPartitions(Arrays.asList(7, 8, 9))); standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(subtopology2) + .setSubtopologyId(subtopology2) .setPartitions(Arrays.asList(1, 2, 3))); List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks = new ArrayList<>(); warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(subtopology1) + .setSubtopologyId(subtopology1) .setPartitions(Arrays.asList(4, 5, 6))); warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds() - .setSubtopology(subtopology2) + .setSubtopologyId(subtopology2) .setPartitions(Arrays.asList(7, 8, 9))); StreamsGroupTargetAssignmentMemberValue record = new StreamsGroupTargetAssignmentMemberValue() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java index 0b291bd098e..839897d4dae 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java @@ -64,7 +64,7 @@ class CoordinatorStreamsRecordHelpersTest { List<StreamsGroupTopologyValue.Subtopology> expectedTopology = Collections.singletonList(new StreamsGroupTopologyValue.Subtopology() - .setSubtopology("subtopology-id") + .setSubtopologyId("subtopology-id") .setRepartitionSinkTopics(Collections.singletonList("foo")) .setSourceTopics(Collections.singletonList("bar")) .setRepartitionSourceTopics( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java index 93e40214dd8..d6f83025c7f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java @@ -418,10 +418,10 @@ public class CurrentAssignmentBuilderTest { .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Collections.emptySet()) .withOwnedActiveTasks(Arrays.asList( new StreamsGroupHeartbeatRequestData.TaskIds() - .setSubtopology(subtopologyId1) + .setSubtopologyId(subtopologyId1) .setPartitions(Arrays.asList(2, 3)), new StreamsGroupHeartbeatRequestData.TaskIds() - .setSubtopology(subtopologyId2) + .setSubtopologyId(subtopologyId2) .setPartitions(Arrays.asList(5, 6)))) .withOwnedStandbyTasks(Collections.emptyList()) .withOwnedWarmupTasks(Collections.emptyList()) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java index 0e5a0ffef07..00f6ef6c676 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java @@ -248,27 +248,27 @@ public class StreamsGroupMemberTest { .setPreviousMemberEpoch(9) .setState((byte) 1) .setActiveTasks(Collections.singletonList(new TaskIds() - .setSubtopology(subtopologyId1) + .setSubtopologyId(subtopologyId1) .setPartitions(Arrays.asList(1, 2))) ) .setStandbyTasks(Collections.singletonList(new TaskIds() - .setSubtopology(subtopologyId2) + .setSubtopologyId(subtopologyId2) .setPartitions(Arrays.asList(3, 4))) ) .setWarmupTasks(Collections.singletonList(new TaskIds() - .setSubtopology(subtopologyId1) + .setSubtopologyId(subtopologyId1) .setPartitions(Arrays.asList(5, 6))) ) .setActiveTasksPendingRevocation(Collections.singletonList(new TaskIds() - .setSubtopology(subtopologyId2) + .setSubtopologyId(subtopologyId2) .setPartitions(Arrays.asList(7, 8))) ) .setStandbyTasksPendingRevocation(Collections.singletonList(new TaskIds() - .setSubtopology(subtopologyId1) + .setSubtopologyId(subtopologyId1) .setPartitions(Arrays.asList(9, 10))) ) .setWarmupTasksPendingRevocation(Collections.singletonList(new TaskIds() - .setSubtopology(subtopologyId2) + .setSubtopologyId(subtopologyId2) .setPartitions(Arrays.asList(11, 12))) ); @@ -300,13 +300,13 @@ public class StreamsGroupMemberTest { .setMemberEpoch(epoch) .setPreviousMemberEpoch(epoch - 1) .setActiveTasks(Collections.singletonList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() - .setSubtopology(subTopology1) + .setSubtopologyId(subTopology1) .setPartitions(assignedTasks1))) .setStandbyTasks(Collections.singletonList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() - .setSubtopology(subTopology2) + .setSubtopologyId(subTopology2) .setPartitions(assignedTasks2))) .setWarmupTasks(Collections.singletonList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() - .setSubtopology(subTopology3) + .setSubtopologyId(subTopology3) .setPartitions(assignedTasks3))); String memberId = Uuid.randomUuid().toString(); String clientId = "clientId"; @@ -355,25 +355,25 @@ public class StreamsGroupMemberTest { .setAssignment( new StreamsGroupDescribeResponseData.Assignment() .setActiveTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() - .setSubtopology(subTopology1) + .setSubtopologyId(subTopology1) .setPartitions(assignedTasks1))) .setStandbyTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() - .setSubtopology(subTopology2) + .setSubtopologyId(subTopology2) .setPartitions(assignedTasks2))) .setWarmupTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() - .setSubtopology(subTopology3) + .setSubtopologyId(subTopology3) .setPartitions(assignedTasks3))) ) .setTargetAssignment( new StreamsGroupDescribeResponseData.Assignment() .setActiveTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() - .setSubtopology(subTopology1) + .setSubtopologyId(subTopology1) .setPartitions(assignedTasks3))) .setStandbyTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() - .setSubtopology(subTopology2) + .setSubtopologyId(subTopology2) .setPartitions(assignedTasks2))) .setWarmupTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() - .setSubtopology(subTopology3) + .setSubtopologyId(subTopology3) .setPartitions(assignedTasks1))) ); // TODO: Add TaskOffsets diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java index 7ef086c9be5..8cb9c9aa645 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java @@ -45,8 +45,8 @@ public class StreamsTopologyTest { @Test public void subtopologiesShouldBeCorrect() { Map<String, Subtopology> subtopologies = mkMap( - mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), - mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + mkEntry("subtopology-1", new Subtopology().setSubtopologyId("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopologyId("subtopology-2")) ); StreamsTopology topology = new StreamsTopology("topology-id", subtopologies); assertEquals(subtopologies, topology.subtopologies()); @@ -83,8 +83,8 @@ public class StreamsTopologyTest { StreamsGroupTopologyValue record = new StreamsGroupTopologyValue() .setTopologyId("topology-id") .setTopology(Arrays.asList( - new Subtopology().setSubtopology("subtopology-1"), - new Subtopology().setSubtopology("subtopology-2") + new Subtopology().setSubtopologyId("subtopology-1"), + new Subtopology().setSubtopologyId("subtopology-2") )); StreamsTopology topology = StreamsTopology.fromRecord(record); assertEquals("topology-id", topology.topologyId()); @@ -96,8 +96,8 @@ public class StreamsTopologyTest { @Test public void equalsShouldReturnTrueForEqualTopologies() { Map<String, Subtopology> subtopologies = mkMap( - mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), - mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + mkEntry("subtopology-1", new Subtopology().setSubtopologyId("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopologyId("subtopology-2")) ); StreamsTopology topology1 = new StreamsTopology("topology-id", subtopologies); StreamsTopology topology2 = new StreamsTopology("topology-id", subtopologies); @@ -107,10 +107,10 @@ public class StreamsTopologyTest { @Test public void equalsShouldReturnFalseForDifferentTopologies() { Map<String, Subtopology> subtopologies1 = mkMap( - mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")) + mkEntry("subtopology-1", new Subtopology().setSubtopologyId("subtopology-1")) ); Map<String, Subtopology> subtopologies2 = mkMap( - mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + mkEntry("subtopology-2", new Subtopology().setSubtopologyId("subtopology-2")) ); StreamsTopology topology1 = new StreamsTopology("topology-id-1", subtopologies1); StreamsTopology topology2 = new StreamsTopology("topology-id-2", subtopologies2); @@ -120,8 +120,8 @@ public class StreamsTopologyTest { @Test public void hashCodeShouldBeConsistentWithEquals() { Map<String, Subtopology> subtopologies = mkMap( - mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), - mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + mkEntry("subtopology-1", new Subtopology().setSubtopologyId("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopologyId("subtopology-2")) ); StreamsTopology topology1 = new StreamsTopology("topology-id", subtopologies); StreamsTopology topology2 = new StreamsTopology("topology-id", subtopologies); @@ -131,8 +131,8 @@ public class StreamsTopologyTest { @Test public void toStringShouldReturnCorrectRepresentation() { Map<String, Subtopology> subtopologies = mkMap( - mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), - mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + mkEntry("subtopology-1", new Subtopology().setSubtopologyId("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopologyId("subtopology-2")) ); StreamsTopology topology = new StreamsTopology("topology-id", subtopologies); String expectedString = "StreamsTopology{topologyId=topology-id, subtopologies=" + subtopologies + "}"; @@ -144,7 +144,7 @@ public class StreamsTopologyTest { Map<String, Subtopology> subtopologies = mkMap( mkEntry("subtopology-1", new Subtopology() .setSourceTopicRegex("regex-1") - .setSubtopology("subtopology-1") + .setSubtopologyId("subtopology-1") .setSourceTopics(Collections.singletonList("source-topic-1")) .setRepartitionSinkTopics(Collections.singletonList("sink-topic-1")) .setRepartitionSourceTopics( @@ -154,7 +154,7 @@ public class StreamsTopologyTest { ), mkEntry("subtopology-2", new Subtopology() .setSourceTopicRegex("regex-2") - .setSubtopology("subtopology-2") + .setSubtopologyId("subtopology-2") .setSourceTopics(Collections.singletonList("source-topic-2")) .setRepartitionSinkTopics(Collections.singletonList("sink-topic-2")) .setRepartitionSourceTopics( @@ -167,13 +167,13 @@ public class StreamsTopologyTest { List<StreamsGroupDescribeResponseData.Subtopology> result = topology.asStreamsGroupDescribeTopology(); assertEquals(2, result.size()); assertEquals("regex-1", result.get(0).sourceTopicRegex()); - assertEquals("subtopology-1", result.get(0).subtopology()); + assertEquals("subtopology-1", result.get(0).subtopologyId()); assertEquals(Collections.singletonList("source-topic-1"), result.get(0).sourceTopics()); assertEquals(Collections.singletonList("sink-topic-1"), result.get(0).repartitionSinkTopics()); assertEquals("repartition-topic-1", result.get(0).repartitionSourceTopics().get(0).name()); assertEquals("changelog-topic-1", result.get(0).stateChangelogTopics().get(0).name()); assertEquals("regex-2", result.get(1).sourceTopicRegex()); - assertEquals("subtopology-2", result.get(1).subtopology()); + assertEquals("subtopology-2", result.get(1).subtopologyId()); assertEquals(Collections.singletonList("source-topic-2"), result.get(1).sourceTopics()); assertEquals(Collections.singletonList("sink-topic-2"), result.get(1).repartitionSinkTopics()); assertEquals("repartition-topic-2", result.get(1).repartitionSourceTopics().get(0).name()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java index 1679336815f..e7b52deae42 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -117,7 +117,7 @@ public class TargetAssignmentBuilderTest { )); topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numTasks); topology.subtopologies().put(subtopologyId, new StreamsGroupTopologyValue.Subtopology() - .setSubtopology(subtopologyId) + .setSubtopologyId(subtopologyId) .setSourceTopics(Collections.singletonList(topicId.toString()))); return subtopologyId;
