This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 41669d978ecb4ae58961f735df9c7a2b718ec8e6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Jun 7 11:55:06 2024 +0200

    Minor: Rename TaskId to TaskIds
    
    See https://github.com/lucasbru/kafka/pull/16
---
 .../internals/StreamsHeartbeatRequestManager.java      | 14 +++++++-------
 .../common/message/StreamsHeartbeatRequest.json        | 18 ++++++++++--------
 .../common/message/StreamsHeartbeatResponse.json       |  8 ++++----
 .../message/StreamsInstallAssignmentRequest.json       |  8 ++++----
 .../message/StreamsPrepareAssignmentResponse.json      | 14 ++++++++------
 .../internals/StreamsHeartbeatRequestManagerTest.java  |  6 +++---
 .../StreamsGroupCurrentMemberAssignmentValue.json      |  8 ++++----
 .../StreamsGroupTargetAssignmentMemberValue.json       |  8 ++++----
 8 files changed, 44 insertions(+), 40 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
index ffa6bb2657f..4f4d80104fd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
 import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
-import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskId;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskIds;
 import org.apache.kafka.common.message.StreamsHeartbeatRequestData.HostInfo;
 import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -329,7 +329,7 @@ public class StreamsHeartbeatRequestManager implements 
RequestManager {
     }
 
     private void updateTaskIdCollection(
-        final List<StreamsHeartbeatResponseData.TaskId> source,
+        final List<StreamsHeartbeatResponseData.TaskIds> source,
         final Set<StreamsAssignmentInterface.TaskId> target
     ) {
         target.clear();
@@ -605,7 +605,7 @@ public class StreamsHeartbeatRequestManager implements 
RequestManager {
             return data;
         }
 
-        private List<TaskId> convertTaskIdCollection(final 
Set<StreamsAssignmentInterface.TaskId> tasks) {
+        private List<TaskIds> convertTaskIdCollection(final 
Set<StreamsAssignmentInterface.TaskId> tasks) {
             return tasks.stream()
                 .collect(
                     
Collectors.groupingBy(StreamsAssignmentInterface.TaskId::subtopologyId,
@@ -614,10 +614,10 @@ public class StreamsHeartbeatRequestManager implements 
RequestManager {
                 .entrySet()
                 .stream()
                 .map(entry -> {
-                    TaskId id = new TaskId();
-                    id.setSubtopology(entry.getKey());
-                    id.setPartitions(entry.getValue());
-                    return id;
+                    TaskIds ids = new TaskIds();
+                    ids.setSubtopology(entry.getKey());
+                    ids.setPartitions(entry.getValue());
+                    return ids;
                 })
                 .collect(Collectors.toList());
         }
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
index cd8df8c477e..2ac6ce2e6e9 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
@@ -39,11 +39,11 @@
     { "name": "Assignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": null,
       "about": "The desired assignor. Picked by majority vote among clients. 
If null, the client votes for the broker to pick the assignor." },
 
-    { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+    { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local assignment for this consumer. Null if unchanged since 
last heartbeat." },
-    { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+    { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local standby tasks for this consumer. Null if unchanged since 
last heartbeat." },
-    { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+    { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local warming up tasks for this consumer. Null if unchanged 
since last heartbeat." },
 
     { "name": "ProcessId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
@@ -65,8 +65,10 @@
     { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Cumulative offsets for assigned and dormant standby tasks. 
Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since 
last heartbeat.",
       "fields": [
-        { "name": "TaskId", "type": "TaskId", "versions": "0+",
-          "about": "The task ID" },
+        { "name": "Subtopology", "type": "string", "versions": "0+",
+          "about": "A unique identifier of the subtopology." },
+        { "name": "Partition", "type": "int32", "versions": "0+",
+          "about": "The partition of the input topics." },
         { "name": "Offset", "type": "int64", "versions": "0+",
           "about": "The cumulative offset for the task" }
       ]
@@ -79,11 +81,11 @@
       "about": "Whether all Streams clients in the group should shut down." }
   ],
   "commonStructs": [
-    { "name": "TaskId", "versions": "0+", "fields": [
+    { "name": "TaskIds", "versions": "0+", "fields": [
       { "name": "Subtopology", "type": "string", "versions": "0+",
-        "about": "subtopology ID" },
+        "about": "A unique identifier of the subtopology." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
-        "about": "partitions" }
+        "about": "The partition of the input topics." }
     ]},
     { "name": "KeyValue", "versions": "0+",
       "fields": [
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
index 513e1d78911..2a3e7f1ea80 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
@@ -62,11 +62,11 @@
       "about": "An error message relating to the current state of the 
assignment to this member." },
 
     // The streams app knows which partitions to fetch from given this 
information
-    { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+    { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local assignment for this consumer. Null if unchanged since 
last heartbeat." },
-    { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+    { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local standby tasks for this consumer. Null if unchanged since 
last heartbeat." },
-    { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+    { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local warming up tasks for this consumer. Null if unchanged 
since last heartbeat." },
 
     // IQ-related information
@@ -89,7 +89,7 @@
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "partitions" }
     ]},
-    { "name": "TaskId", "versions": "0+", "fields": [
+    { "name": "TaskIds", "versions": "0+", "fields": [
       // We are using strings here, to avoid fixing a structure on task IDs 
which will
       // allow introducing stable naming for subtopologies in the future.
       { "name": "Subtopology", "type": "string", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
index 27d15cc66ec..847bb9bb215 100644
--- 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
@@ -20,16 +20,16 @@
       "about": "The members.", "fields": [
       { "name": "MemberId", "type": "string", "versions": "0+",
         "about": "The member ID." },
-      { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+",
+      { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+",
         "about": "Local assignment for this consumer." },
-      { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+",
+      { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+",
         "about": "Local standby tasks for this consumer." },
-      { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+",
+      { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
         "about": "Local warming up tasks for this consumer." }
     ]}
   ],
   "commonStructs": [
-    { "name": "TaskId", "versions": "0+", "fields": [
+    { "name": "TaskIds", "versions": "0+", "fields": [
       { "name": "Subtopology", "type": "string", "versions": "0+",
         "about": "subtopology ID" },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
index c917931a568..1d9622caa9e 100644
--- 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
@@ -39,11 +39,11 @@
       { "name": "TopologyHash", "type": "bytes", "versions": "0+",
         "about": "The hash of the topology." },
 
-      { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+",
+      { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+",
         "about": "Target active tasks for this consumer." },
-      { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+",
+      { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+",
         "about": "Target standby tasks for this consumer." },
-      { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+",
+      { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
         "about": "Target warming up tasks for this consumer. " },
 
       { "name": "ProcessId", "type": "uuid", "versions": "0+",
@@ -55,8 +55,10 @@
       { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
         "about": "Cumulative offsets for assigned and dormant standby tasks. 
Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since 
last heartbeat.",
         "fields": [
-          { "name": "TaskId", "type": "TaskId", "versions": "0+",
-            "about": "The task ID" },
+          { "name": "Subtopology", "type": "string", "versions": "0+",
+            "about": "A unique identifier of the subtopology." },
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition of the input topics." },
           { "name": "Offset", "type": "int64", "versions": "0+",
             "about": "The cumulative offset for the task" }
         ]
@@ -78,7 +80,7 @@
       ]}
   ],
   "commonStructs": [
-    { "name": "TaskId", "versions": "0+", "fields": [
+    { "name": "TaskIds", "versions": "0+", "fields": [
       { "name": "Subtopology", "type": "string", "versions": "0+",
         "about": "subtopology ID" },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java
index 17b99c9c72f..37e89bfdd69 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java
@@ -292,11 +292,11 @@ class StreamsHeartbeatRequestManagerTest {
             .setThrottleTimeMs(TEST_THROTTLE_TIME_MS)
             .setHeartbeatIntervalMs(1000)
             .setActiveTasks(Collections.singletonList(
-                new 
StreamsHeartbeatResponseData.TaskId().setSubtopology("0").setPartitions(Collections.singletonList(0))))
+                new 
StreamsHeartbeatResponseData.TaskIds().setSubtopology("0").setPartitions(Collections.singletonList(0))))
             .setStandbyTasks(Collections.singletonList(
-                new 
StreamsHeartbeatResponseData.TaskId().setSubtopology("1").setPartitions(Collections.singletonList(1))))
+                new 
StreamsHeartbeatResponseData.TaskIds().setSubtopology("1").setPartitions(Collections.singletonList(1))))
             .setWarmupTasks(Collections.singletonList(
-                new 
StreamsHeartbeatResponseData.TaskId().setSubtopology("2").setPartitions(Collections.singletonList(2))));
+                new 
StreamsHeartbeatResponseData.TaskIds().setSubtopology("2").setPartitions(Collections.singletonList(2))));
 
         mockResponse(data);
 
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
index d79e58cba75..0cbe1ba1e4e 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
@@ -26,17 +26,17 @@
       "about": "If the last epoch bump is lost before reaching the member, the 
member will retry with the previous epoch." },
     { "name": "State", "versions": "0+", "type": "int8",
       "about": "The member state. See StreamsGroupMember.MemberState for the 
possible values." },
-    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskId",
+    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds",
       "about": "Currently assigned active tasks for this streams client." },
-    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskId",
+    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds",
       "about": "Currently assigned standby tasks for this streams client." },
-    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskId",
+    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds",
       "about": "Currently assigned warming up tasks for this streams client." 
},
     { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": 
"[]TaskId",
       "about": "The active tasks that must be revoked by this member." }
   ],
   "commonStructs": [
-    { "name": "TaskId", "versions": "0+", "fields": [
+    { "name": "TaskIds", "versions": "0+", "fields": [
       { "name": "Subtopology", "type": "string", "versions": "0+",
         "about": "The subtopology identifier." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
index 38526e4381c..06cb84ab18b 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json
@@ -20,15 +20,15 @@
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
-    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskId",
+    { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds",
       "about": "Currently assigned active tasks for this streams client." },
-    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskId",
+    { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds",
       "about": "Currently assigned standby tasks for this streams client." },
-    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskId",
+    { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds",
       "about": "Currently assigned warming up tasks for this streams client." }
   ],
   "commonStructs": [
-    { "name": "TaskId", "versions": "0+", "fields": [
+    { "name": "TaskIds", "versions": "0+", "fields": [
       { "name": "Subtopology", "type": "string", "versions": "0+",
         "about": "The subtopology identifier." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",

Reply via email to