This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new c1341b83d92 MINOR: Use subtopologyId and spelling consistently (#17370)
c1341b83d92 is described below
commit c1341b83d925b31c61acd99a3453f651cae92a7f
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Oct 7 13:01:15 2024 +0200
MINOR: Use subtopologyId and spelling consistently (#17370)
Throughout RPCs, we were using subtopology instead of subtopologyId for
the string ID of a subtopology. We were also using sub-topology
spelling. Cleaning this up before merging the auto-topic creating code.
---
.../StreamsGroupHeartbeatRequestManager.java | 8 +++---
.../message/StreamsGroupDescribeResponse.json | 14 +++++-----
.../message/StreamsGroupHeartbeatRequest.json | 8 +++---
.../message/StreamsGroupHeartbeatResponse.json | 2 +-
.../message/StreamsGroupInitializeRequest.json | 8 +++---
.../StreamsGroupHeartbeatRequestManagerTest.java | 6 ++--
.../coordinator/group/GroupMetadataManager.java | 4 +--
.../coordinator/group/streams/Assignment.java | 6 ++--
.../streams/CoordinatorStreamsRecordHelpers.java | 10 +++----
.../group/streams/CurrentAssignmentBuilder.java | 8 +++---
.../group/streams/StreamsGroupMember.java | 4 +--
.../coordinator/group/streams/StreamsTopology.java | 6 ++--
.../StreamsGroupCurrentMemberAssignmentValue.json | 2 +-
.../StreamsGroupTargetAssignmentMemberValue.json | 2 +-
.../common/message/StreamsGroupTopologyValue.json | 6 ++--
.../coordinator/group/streams/AssignmentTest.java | 12 ++++----
.../CoordinatorStreamsRecordHelpersTest.java | 2 +-
.../streams/CurrentAssignmentBuilderTest.java | 4 +--
.../group/streams/StreamsGroupMemberTest.java | 30 ++++++++++----------
.../group/streams/StreamsTopologyTest.java | 32 +++++++++++-----------
.../group/streams/TargetAssignmentBuilderTest.java | 2 +-
21 files changed, 88 insertions(+), 88 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index f41ed9d0998..3a80765206d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -271,8 +271,8 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
private void setTargetAssignmentForConsumerGroup(final
StreamsGroupHeartbeatResponseData data, final
ConsumerGroupHeartbeatResponseData cgData) {
Map<String, TopicPartitions> tps = new HashMap<>();
data.activeTasks().forEach(taskId -> Stream.concat(
-
streamsInterface.subtopologyMap().get(taskId.subtopology()).sourceTopics.stream(),
-
streamsInterface.subtopologyMap().get(taskId.subtopology()).repartitionSourceTopics.keySet().stream()
+
streamsInterface.subtopologyMap().get(taskId.subtopologyId()).sourceTopics.stream(),
+
streamsInterface.subtopologyMap().get(taskId.subtopologyId()).repartitionSourceTopics.keySet().stream()
)
.forEach(topic -> {
final TopicPartitions toInsert = tps.computeIfAbsent(topic, k
-> {
@@ -323,7 +323,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
target.clear();
source.forEach(taskId -> {
taskId.partitions().forEach(partition -> {
- target.add(new
StreamsAssignmentInterface.TaskId(taskId.subtopology(), partition));
+ target.add(new
StreamsAssignmentInterface.TaskId(taskId.subtopologyId(), partition));
});
});
}
@@ -602,7 +602,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
.stream()
.map(entry -> {
TaskIds ids = new TaskIds();
- ids.setSubtopology(entry.getKey());
+ ids.setSubtopologyId(entry.getKey());
ids.setPartitions(entry.getValue());
return ids;
})
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 9cf580f0441..227ac495c2d 100644
---
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -48,8 +48,8 @@
{ "name": "Topology", "type": "[]Subtopology", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "The sub-topologies of the streams application. Null if
uninitialized.",
"fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
- "about": "String to uniquely identify the sub-topology." },
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
+ "about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
{ "name": "SourceTopicRegex", "type": "string", "versions": "0+",
@@ -57,7 +57,7 @@
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
"about": "The repartition topics the topology writes to." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo",
"versions": "0+",
- "about": "The set of state changelog topics associated with this
sub-topology. Created automatically." },
+ "about": "The set of state changelog topics associated with this
subtopology. Created automatically." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
"about": "The set of source topics that are internally created
repartition topics. Created automatically." }
]
@@ -103,8 +103,8 @@
],
"commonStructs": [
{ "name": "TaskOffset", "versions": "0+", "fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
- "about": "The sub-topology identifier." },
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
+ "about": "The subtopology identifier." },
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition." },
{ "name": "Offset", "type": "int64", "versions": "0+",
@@ -127,8 +127,8 @@
"about": "Warm-up tasks for this client. " }
]},
{ "name": "TaskIds", "versions": "0+", "fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
- "about": "The sub-topology identifier." },
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
+ "about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this
member." }
]},
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index 7f829f1e410..ec28c2766fb 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -72,16 +72,16 @@
"about": "value of the config" }
]},
{ "name": "TaskOffset", "versions": "0+", "fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
- "about": "The sub-topology identifier." },
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
+ "about": "The subtopology identifier." },
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition." },
{ "name": "Offset", "type": "int64", "versions": "0+",
"about": "The offset." }
]},
{ "name": "TaskIds", "versions": "0+", "fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
- "about": "The sub-topology identifier." },
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
+ "about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this
member." }
]}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index ac11b3d8264..40e134dc407 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -96,7 +96,7 @@
"about": "partitions" }
]},
{ "name": "TaskIds", "versions": "0+", "fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this
member." }
diff --git
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
index 3223d1e8d9e..05c4709092c 100644
---
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
+++
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
@@ -29,15 +29,15 @@
"about": "The sub-topologies of the streams application.",
"fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
- "about": "String to uniquely identify the sub-topology.
Deterministically generated from the topology" },
+ "about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
{ "name": "SourceTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The regular expressions identifying topics the
sub-topology reads from." },
+ "about": "The regular expressions identifying topics the subtopology
reads from." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions":
"0+",
- "about": "The set of state changelog topics associated with this
sub-topology. Created automatically." },
+ "about": "The set of state changelog topics associated with this
subtopology. Created automatically." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
- "about": "The repartition topics the sub-topology writes to." },
+ "about": "The repartition topics the subtopology writes to." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
"about": "The set of source topics that are internally created
repartition topics. Created automatically." }
]
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 503576d8fd5..a635d7d418e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -282,11 +282,11 @@ class StreamsGroupHeartbeatRequestManagerTest {
.setThrottleTimeMs(TEST_THROTTLE_TIME_MS)
.setHeartbeatIntervalMs(1000)
.setActiveTasks(Collections.singletonList(
- new
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("0").setPartitions(Collections.singletonList(0))))
+ new
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("0").setPartitions(Collections.singletonList(0))))
.setStandbyTasks(Collections.singletonList(
- new
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("1").setPartitions(Collections.singletonList(1))))
+ new
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("1").setPartitions(Collections.singletonList(1))))
.setWarmupTasks(Collections.singletonList(
- new
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopology("2").setPartitions(Collections.singletonList(2))));
+ new
StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId("2").setPartitions(Collections.singletonList(2))));
mockResponse(data);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index fd9615afba0..5f8acad2af1 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1712,7 +1712,7 @@ public class GroupMetadataManager {
if (ownedTasks == null) return false;
for (StreamsGroupHeartbeatRequestData.TaskIds topicPartitions :
ownedTasks) {
- Set<Integer> partitions =
target.get(topicPartitions.subtopology());
+ Set<Integer> partitions =
target.get(topicPartitions.subtopologyId());
if (partitions == null) return false;
for (Integer partitionId : topicPartitions.partitions()) {
if (!partitions.contains(partitionId)) return false;
@@ -2354,7 +2354,7 @@ public class GroupMetadataManager {
private List<StreamsGroupHeartbeatResponseData.TaskIds>
createStreamsGroupHeartbeatResponseTaskIds(final Map<String, Set<Integer>>
taskIds) {
return taskIds.entrySet().stream()
.map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds()
- .setSubtopology(entry.getKey())
+ .setSubtopologyId(entry.getKey())
.setPartitions(new ArrayList<>(entry.getValue())))
.collect(Collectors.toList());
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
index b80e34cbfe5..09a83323072 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
@@ -108,19 +108,19 @@ public class Assignment {
return new Assignment(
record.activeTasks().stream()
.collect(Collectors.toMap(
-
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
record.standbyTasks().stream()
.collect(Collectors.toMap(
-
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
),
record.warmupTasks().stream()
.collect(Collectors.toMap(
-
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopology,
+
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
taskId -> new HashSet<>(taskId.partitions())
)
)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
index 48a31fdd4c7..7a7ee7f2ae2 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
@@ -213,7 +213,7 @@ public class CoordinatorStreamsRecordHelpers {
for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) {
activeTaskIds.add(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(entry.getKey())
+ .setSubtopologyId(entry.getKey())
.setPartitions(new ArrayList<>(entry.getValue()))
);
}
@@ -221,7 +221,7 @@ public class CoordinatorStreamsRecordHelpers {
for (Map.Entry<String, Set<Integer>> entry : standbyTasks.entrySet()) {
standbyTaskIds.add(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(entry.getKey())
+ .setSubtopologyId(entry.getKey())
.setPartitions(new ArrayList<>(entry.getValue()))
);
}
@@ -229,7 +229,7 @@ public class CoordinatorStreamsRecordHelpers {
for (Map.Entry<String, Set<Integer>> entry : warmupTasks.entrySet()) {
warmupTaskIds.add(
new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(entry.getKey())
+ .setSubtopologyId(entry.getKey())
.setPartitions(new ArrayList<>(entry.getValue()))
);
}
@@ -366,7 +366,7 @@ public class CoordinatorStreamsRecordHelpers {
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new
ArrayList<>(tasks.size());
tasks.forEach((subtopologyId, partitions) ->
taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
- .setSubtopology(subtopologyId)
+ .setSubtopologyId(subtopologyId)
.setPartitions(new ArrayList<>(partitions)))
);
return taskIds;
@@ -399,7 +399,7 @@ public class CoordinatorStreamsRecordHelpers {
return new
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
}).collect(Collectors.toList());
- value.topology().add(new
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId())
+ value.topology().add(new
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
});
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
index 24e8a2f2a6c..974f465a740 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
@@ -168,17 +168,17 @@ public class CurrentAssignmentBuilder {
} else {
this.ownedActiveTasks =
ownedAssignment.activeTasks().entrySet().stream()
.map(activeTaskIds -> new
StreamsGroupHeartbeatRequestData.TaskIds()
- .setSubtopology(activeTaskIds.getKey())
+ .setSubtopologyId(activeTaskIds.getKey())
.setPartitions(new ArrayList<>(activeTaskIds.getValue())))
.collect(Collectors.toList());
this.ownedStandbyTasks =
ownedAssignment.standbyTasks().entrySet().stream()
.map(standbyTaskIds -> new
StreamsGroupHeartbeatRequestData.TaskIds()
- .setSubtopology(standbyTaskIds.getKey())
+ .setSubtopologyId(standbyTaskIds.getKey())
.setPartitions(new ArrayList<>(standbyTaskIds.getValue())))
.collect(Collectors.toList());
this.ownedWarmupTasks =
ownedAssignment.warmupTasks().entrySet().stream()
.map(warmupTaskIds -> new
StreamsGroupHeartbeatRequestData.TaskIds()
- .setSubtopology(warmupTaskIds.getKey())
+ .setSubtopologyId(warmupTaskIds.getKey())
.setPartitions(new ArrayList<>(warmupTaskIds.getValue())))
.collect(Collectors.toList());
}
@@ -329,7 +329,7 @@ public class CurrentAssignmentBuilder {
for (StreamsGroupHeartbeatRequestData.TaskIds owned : ownedTasks) {
Set<Integer> tasksPendingRevocation =
- assignment.getOrDefault(owned.subtopology(),
Collections.emptySet());
+ assignment.getOrDefault(owned.subtopologyId(),
Collections.emptySet());
for (Integer partitionId : owned.partitions()) {
if (tasksPendingRevocation.contains(partitionId)) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 7caa1f77ff0..12d97d32e62 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -277,7 +277,7 @@ public class StreamsGroupMember {
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds>
topicPartitionsList
) {
return topicPartitionsList.stream().collect(Collectors.toMap(
- StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopology,
+
StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId,
taskIds -> Collections.unmodifiableSet(new
HashSet<>(taskIds.partitions()))));
}
@@ -632,7 +632,7 @@ public class StreamsGroupMember {
List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new
ArrayList<>();
tasks.forEach((subtopologyId, partitionSet) -> {
taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
- .setSubtopology(subtopologyId)
+ .setSubtopologyId(subtopologyId)
.setPartitions(new ArrayList<>(partitionSet)));
});
return taskIds;
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index ca2fd9a6da0..16b8f6d5178 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -68,7 +68,7 @@ public class StreamsTopology {
this.subtopologies.put(
subtopology.subtopologyId(),
- new
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId())
+ new
StreamsGroupTopologyValue.Subtopology().setSubtopologyId(subtopology.subtopologyId())
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
});
@@ -92,7 +92,7 @@ public class StreamsTopology {
public static StreamsTopology fromRecord(StreamsGroupTopologyValue record)
{
return new StreamsTopology(
record.topologyId(),
-
record.topology().stream().collect(Collectors.toMap(Subtopology::subtopology, x
-> x))
+
record.topology().stream().collect(Collectors.toMap(Subtopology::subtopologyId,
x -> x))
);
}
@@ -125,7 +125,7 @@ public class StreamsTopology {
return subtopologies.values().stream().map(
subtopology -> new StreamsGroupDescribeResponseData.Subtopology()
.setSourceTopicRegex(subtopology.sourceTopicRegex())
- .setSubtopology(subtopology.subtopology())
+ .setSubtopologyId(subtopology.subtopologyId())
.setSourceTopics(subtopology.sourceTopics())
.setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
.setRepartitionSourceTopics(
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
index bcb0e6b6eed..8d393de7b6c 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
@@ -41,7 +41,7 @@
],
"commonStructs": [
{ "name": "TaskIds", "versions": "0+", "fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this
member." }
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
index f33e0ace34c..88424950082 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
@@ -29,7 +29,7 @@
],
"commonStructs": [
{ "name": "TaskIds", "versions": "0+", "fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of the input topics processed by this
member." }
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
index 5ba633095b6..af1fae06aad 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json
@@ -25,8 +25,8 @@
{ "name": "Topology", "type": "[]Subtopology", "versions": "0+",
"about": "The sub-topologies of the streams application.",
"fields": [
- { "name": "Subtopology", "type": "string", "versions": "0+",
- "about": "String to uniquely identify the subtopology." },
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
+ "about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
{ "name": "SourceTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
@@ -34,7 +34,7 @@
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions":
"0+",
"about": "The set of state changelog topics associated with this
subtopology. " },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
- "about": "The repartition topics the sub-topology writes to." },
+ "about": "The repartition topics the subtopology writes to." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
"about": "The set of source topics that are internally created
repartition topics. " }
]
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
index 57a48e2d365..65675136eb7 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
@@ -64,24 +64,24 @@ public class AssignmentTest {
String subtopology2 = "subtopology2";
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks =
new ArrayList<>();
activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(subtopology1)
+ .setSubtopologyId(subtopology1)
.setPartitions(Arrays.asList(1, 2, 3)));
activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(subtopology2)
+ .setSubtopologyId(subtopology2)
.setPartitions(Arrays.asList(4, 5, 6)));
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks =
new ArrayList<>();
standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(subtopology1)
+ .setSubtopologyId(subtopology1)
.setPartitions(Arrays.asList(7, 8, 9)));
standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(subtopology2)
+ .setSubtopologyId(subtopology2)
.setPartitions(Arrays.asList(1, 2, 3)));
List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks =
new ArrayList<>();
warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(subtopology1)
+ .setSubtopologyId(subtopology1)
.setPartitions(Arrays.asList(4, 5, 6)));
warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
- .setSubtopology(subtopology2)
+ .setSubtopologyId(subtopology2)
.setPartitions(Arrays.asList(7, 8, 9)));
StreamsGroupTargetAssignmentMemberValue record = new
StreamsGroupTargetAssignmentMemberValue()
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
index 0b291bd098e..839897d4dae 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
@@ -64,7 +64,7 @@ class CoordinatorStreamsRecordHelpersTest {
List<StreamsGroupTopologyValue.Subtopology> expectedTopology =
Collections.singletonList(new
StreamsGroupTopologyValue.Subtopology()
- .setSubtopology("subtopology-id")
+ .setSubtopologyId("subtopology-id")
.setRepartitionSinkTopics(Collections.singletonList("foo"))
.setSourceTopics(Collections.singletonList("bar"))
.setRepartitionSourceTopics(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
index 93e40214dd8..d6f83025c7f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
@@ -418,10 +418,10 @@ public class CurrentAssignmentBuilderTest {
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
Collections.emptySet())
.withOwnedActiveTasks(Arrays.asList(
new StreamsGroupHeartbeatRequestData.TaskIds()
- .setSubtopology(subtopologyId1)
+ .setSubtopologyId(subtopologyId1)
.setPartitions(Arrays.asList(2, 3)),
new StreamsGroupHeartbeatRequestData.TaskIds()
- .setSubtopology(subtopologyId2)
+ .setSubtopologyId(subtopologyId2)
.setPartitions(Arrays.asList(5, 6))))
.withOwnedStandbyTasks(Collections.emptyList())
.withOwnedWarmupTasks(Collections.emptyList())
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 0e5a0ffef07..00f6ef6c676 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -248,27 +248,27 @@ public class StreamsGroupMemberTest {
.setPreviousMemberEpoch(9)
.setState((byte) 1)
.setActiveTasks(Collections.singletonList(new TaskIds()
- .setSubtopology(subtopologyId1)
+ .setSubtopologyId(subtopologyId1)
.setPartitions(Arrays.asList(1, 2)))
)
.setStandbyTasks(Collections.singletonList(new TaskIds()
- .setSubtopology(subtopologyId2)
+ .setSubtopologyId(subtopologyId2)
.setPartitions(Arrays.asList(3, 4)))
)
.setWarmupTasks(Collections.singletonList(new TaskIds()
- .setSubtopology(subtopologyId1)
+ .setSubtopologyId(subtopologyId1)
.setPartitions(Arrays.asList(5, 6)))
)
.setActiveTasksPendingRevocation(Collections.singletonList(new
TaskIds()
- .setSubtopology(subtopologyId2)
+ .setSubtopologyId(subtopologyId2)
.setPartitions(Arrays.asList(7, 8)))
)
.setStandbyTasksPendingRevocation(Collections.singletonList(new
TaskIds()
- .setSubtopology(subtopologyId1)
+ .setSubtopologyId(subtopologyId1)
.setPartitions(Arrays.asList(9, 10)))
)
.setWarmupTasksPendingRevocation(Collections.singletonList(new
TaskIds()
- .setSubtopology(subtopologyId2)
+ .setSubtopologyId(subtopologyId2)
.setPartitions(Arrays.asList(11, 12)))
);
@@ -300,13 +300,13 @@ public class StreamsGroupMemberTest {
.setMemberEpoch(epoch)
.setPreviousMemberEpoch(epoch - 1)
.setActiveTasks(Collections.singletonList(new
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
- .setSubtopology(subTopology1)
+ .setSubtopologyId(subTopology1)
.setPartitions(assignedTasks1)))
.setStandbyTasks(Collections.singletonList(new
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
- .setSubtopology(subTopology2)
+ .setSubtopologyId(subTopology2)
.setPartitions(assignedTasks2)))
.setWarmupTasks(Collections.singletonList(new
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
- .setSubtopology(subTopology3)
+ .setSubtopologyId(subTopology3)
.setPartitions(assignedTasks3)));
String memberId = Uuid.randomUuid().toString();
String clientId = "clientId";
@@ -355,25 +355,25 @@ public class StreamsGroupMemberTest {
.setAssignment(
new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskIds()
- .setSubtopology(subTopology1)
+ .setSubtopologyId(subTopology1)
.setPartitions(assignedTasks1)))
.setStandbyTasks(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskIds()
- .setSubtopology(subTopology2)
+ .setSubtopologyId(subTopology2)
.setPartitions(assignedTasks2)))
.setWarmupTasks(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskIds()
- .setSubtopology(subTopology3)
+ .setSubtopologyId(subTopology3)
.setPartitions(assignedTasks3)))
)
.setTargetAssignment(
new StreamsGroupDescribeResponseData.Assignment()
.setActiveTasks(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskIds()
- .setSubtopology(subTopology1)
+ .setSubtopologyId(subTopology1)
.setPartitions(assignedTasks3)))
.setStandbyTasks(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskIds()
- .setSubtopology(subTopology2)
+ .setSubtopologyId(subTopology2)
.setPartitions(assignedTasks2)))
.setWarmupTasks(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskIds()
- .setSubtopology(subTopology3)
+ .setSubtopologyId(subTopology3)
.setPartitions(assignedTasks1)))
);
// TODO: Add TaskOffsets
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
index 7ef086c9be5..8cb9c9aa645 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -45,8 +45,8 @@ public class StreamsTopologyTest {
@Test
public void subtopologiesShouldBeCorrect() {
Map<String, Subtopology> subtopologies = mkMap(
- mkEntry("subtopology-1", new
Subtopology().setSubtopology("subtopology-1")),
- mkEntry("subtopology-2", new
Subtopology().setSubtopology("subtopology-2"))
+ mkEntry("subtopology-1", new
Subtopology().setSubtopologyId("subtopology-1")),
+ mkEntry("subtopology-2", new
Subtopology().setSubtopologyId("subtopology-2"))
);
StreamsTopology topology = new StreamsTopology("topology-id",
subtopologies);
assertEquals(subtopologies, topology.subtopologies());
@@ -83,8 +83,8 @@ public class StreamsTopologyTest {
StreamsGroupTopologyValue record = new StreamsGroupTopologyValue()
.setTopologyId("topology-id")
.setTopology(Arrays.asList(
- new Subtopology().setSubtopology("subtopology-1"),
- new Subtopology().setSubtopology("subtopology-2")
+ new Subtopology().setSubtopologyId("subtopology-1"),
+ new Subtopology().setSubtopologyId("subtopology-2")
));
StreamsTopology topology = StreamsTopology.fromRecord(record);
assertEquals("topology-id", topology.topologyId());
@@ -96,8 +96,8 @@ public class StreamsTopologyTest {
@Test
public void equalsShouldReturnTrueForEqualTopologies() {
Map<String, Subtopology> subtopologies = mkMap(
- mkEntry("subtopology-1", new
Subtopology().setSubtopology("subtopology-1")),
- mkEntry("subtopology-2", new
Subtopology().setSubtopology("subtopology-2"))
+ mkEntry("subtopology-1", new
Subtopology().setSubtopologyId("subtopology-1")),
+ mkEntry("subtopology-2", new
Subtopology().setSubtopologyId("subtopology-2"))
);
StreamsTopology topology1 = new StreamsTopology("topology-id",
subtopologies);
StreamsTopology topology2 = new StreamsTopology("topology-id",
subtopologies);
@@ -107,10 +107,10 @@ public class StreamsTopologyTest {
@Test
public void equalsShouldReturnFalseForDifferentTopologies() {
Map<String, Subtopology> subtopologies1 = mkMap(
- mkEntry("subtopology-1", new
Subtopology().setSubtopology("subtopology-1"))
+ mkEntry("subtopology-1", new
Subtopology().setSubtopologyId("subtopology-1"))
);
Map<String, Subtopology> subtopologies2 = mkMap(
- mkEntry("subtopology-2", new
Subtopology().setSubtopology("subtopology-2"))
+ mkEntry("subtopology-2", new
Subtopology().setSubtopologyId("subtopology-2"))
);
StreamsTopology topology1 = new StreamsTopology("topology-id-1",
subtopologies1);
StreamsTopology topology2 = new StreamsTopology("topology-id-2",
subtopologies2);
@@ -120,8 +120,8 @@ public class StreamsTopologyTest {
@Test
public void hashCodeShouldBeConsistentWithEquals() {
Map<String, Subtopology> subtopologies = mkMap(
- mkEntry("subtopology-1", new
Subtopology().setSubtopology("subtopology-1")),
- mkEntry("subtopology-2", new
Subtopology().setSubtopology("subtopology-2"))
+ mkEntry("subtopology-1", new
Subtopology().setSubtopologyId("subtopology-1")),
+ mkEntry("subtopology-2", new
Subtopology().setSubtopologyId("subtopology-2"))
);
StreamsTopology topology1 = new StreamsTopology("topology-id",
subtopologies);
StreamsTopology topology2 = new StreamsTopology("topology-id",
subtopologies);
@@ -131,8 +131,8 @@ public class StreamsTopologyTest {
@Test
public void toStringShouldReturnCorrectRepresentation() {
Map<String, Subtopology> subtopologies = mkMap(
- mkEntry("subtopology-1", new
Subtopology().setSubtopology("subtopology-1")),
- mkEntry("subtopology-2", new
Subtopology().setSubtopology("subtopology-2"))
+ mkEntry("subtopology-1", new
Subtopology().setSubtopologyId("subtopology-1")),
+ mkEntry("subtopology-2", new
Subtopology().setSubtopologyId("subtopology-2"))
);
StreamsTopology topology = new StreamsTopology("topology-id",
subtopologies);
String expectedString = "StreamsTopology{topologyId=topology-id,
subtopologies=" + subtopologies + "}";
@@ -144,7 +144,7 @@ public class StreamsTopologyTest {
Map<String, Subtopology> subtopologies = mkMap(
mkEntry("subtopology-1", new Subtopology()
.setSourceTopicRegex("regex-1")
- .setSubtopology("subtopology-1")
+ .setSubtopologyId("subtopology-1")
.setSourceTopics(Collections.singletonList("source-topic-1"))
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-1"))
.setRepartitionSourceTopics(
@@ -154,7 +154,7 @@ public class StreamsTopologyTest {
),
mkEntry("subtopology-2", new Subtopology()
.setSourceTopicRegex("regex-2")
- .setSubtopology("subtopology-2")
+ .setSubtopologyId("subtopology-2")
.setSourceTopics(Collections.singletonList("source-topic-2"))
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-2"))
.setRepartitionSourceTopics(
@@ -167,13 +167,13 @@ public class StreamsTopologyTest {
List<StreamsGroupDescribeResponseData.Subtopology> result =
topology.asStreamsGroupDescribeTopology();
assertEquals(2, result.size());
assertEquals("regex-1", result.get(0).sourceTopicRegex());
- assertEquals("subtopology-1", result.get(0).subtopology());
+ assertEquals("subtopology-1", result.get(0).subtopologyId());
assertEquals(Collections.singletonList("source-topic-1"),
result.get(0).sourceTopics());
assertEquals(Collections.singletonList("sink-topic-1"),
result.get(0).repartitionSinkTopics());
assertEquals("repartition-topic-1",
result.get(0).repartitionSourceTopics().get(0).name());
assertEquals("changelog-topic-1",
result.get(0).stateChangelogTopics().get(0).name());
assertEquals("regex-2", result.get(1).sourceTopicRegex());
- assertEquals("subtopology-2", result.get(1).subtopology());
+ assertEquals("subtopology-2", result.get(1).subtopologyId());
assertEquals(Collections.singletonList("source-topic-2"),
result.get(1).sourceTopics());
assertEquals(Collections.singletonList("sink-topic-2"),
result.get(1).repartitionSinkTopics());
assertEquals("repartition-topic-2",
result.get(1).repartitionSourceTopics().get(0).name());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 1679336815f..e7b52deae42 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -117,7 +117,7 @@ public class TargetAssignmentBuilderTest {
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId,
topicName, numTasks);
topology.subtopologies().put(subtopologyId, new
StreamsGroupTopologyValue.Subtopology()
- .setSubtopology(subtopologyId)
+ .setSubtopologyId(subtopologyId)
.setSourceTopics(Collections.singletonList(topicId.toString())));
return subtopologyId;