This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 41669d978ecb4ae58961f735df9c7a2b718ec8e6 Author: Lucas Brutschy <[email protected]> AuthorDate: Fri Jun 7 11:55:06 2024 +0200 Minor: Rename TaskId to TaskIds See https://github.com/lucasbru/kafka/pull/16 --- .../internals/StreamsHeartbeatRequestManager.java | 14 +++++++------- .../common/message/StreamsHeartbeatRequest.json | 18 ++++++++++-------- .../common/message/StreamsHeartbeatResponse.json | 8 ++++---- .../message/StreamsInstallAssignmentRequest.json | 8 ++++---- .../message/StreamsPrepareAssignmentResponse.json | 14 ++++++++------ .../internals/StreamsHeartbeatRequestManagerTest.java | 6 +++--- .../StreamsGroupCurrentMemberAssignmentValue.json | 8 ++++---- .../StreamsGroupTargetAssignmentMemberValue.json | 8 ++++---- 8 files changed, 44 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java index ffa6bb2657f..4f4d80104fd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; import org.apache.kafka.common.message.StreamsHeartbeatRequestData; -import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskId; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskIds; import org.apache.kafka.common.message.StreamsHeartbeatRequestData.HostInfo; import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -329,7 +329,7 @@ public class StreamsHeartbeatRequestManager implements RequestManager { } private void updateTaskIdCollection( - final List<StreamsHeartbeatResponseData.TaskId> source, + final List<StreamsHeartbeatResponseData.TaskIds> source, final Set<StreamsAssignmentInterface.TaskId> target ) { target.clear(); @@ -605,7 +605,7 @@ public class StreamsHeartbeatRequestManager implements RequestManager { return data; } - private List<TaskId> convertTaskIdCollection(final Set<StreamsAssignmentInterface.TaskId> tasks) { + private List<TaskIds> convertTaskIdCollection(final Set<StreamsAssignmentInterface.TaskId> tasks) { return tasks.stream() .collect( Collectors.groupingBy(StreamsAssignmentInterface.TaskId::subtopologyId, @@ -614,10 +614,10 @@ public class StreamsHeartbeatRequestManager implements RequestManager { .entrySet() .stream() .map(entry -> { - TaskId id = new TaskId(); - id.setSubtopology(entry.getKey()); - id.setPartitions(entry.getValue()); - return id; + TaskIds ids = new TaskIds(); + ids.setSubtopology(entry.getKey()); + ids.setPartitions(entry.getValue()); + return ids; }) .collect(Collectors.toList()); } diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json index cd8df8c477e..2ac6ce2e6e9 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json @@ -39,11 +39,11 @@ { "name": "Assignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": null, "about": "The desired assignor. Picked by majority vote among clients. If null, the client votes for the broker to pick the assignor." }, - { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local assignment for this consumer. Null if unchanged since last heartbeat." }, - { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local standby tasks for this consumer. Null if unchanged since last heartbeat." }, - { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local warming up tasks for this consumer. Null if unchanged since last heartbeat." }, { "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", @@ -65,8 +65,10 @@ { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Cumulative offsets for assigned and dormant standby tasks. Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since last heartbeat.", "fields": [ - { "name": "TaskId", "type": "TaskId", "versions": "0+", - "about": "The task ID" }, + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "A unique identifier of the subtopology." }, + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition of the input topics." }, { "name": "Offset", "type": "int64", "versions": "0+", "about": "The cumulative offset for the task" } ] @@ -79,11 +81,11 @@ "about": "Whether all Streams clients in the group should shut down." } ], "commonStructs": [ - { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "subtopology ID" }, + "about": "A unique identifier of the subtopology." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", - "about": "partitions" } + "about": "The partition of the input topics." } ]}, { "name": "KeyValue", "versions": "0+", "fields": [ diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json index 513e1d78911..2a3e7f1ea80 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json @@ -62,11 +62,11 @@ "about": "An error message relating to the current state of the assignment to this member." }, // The streams app knows which partitions to fetch from given this information - { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local assignment for this consumer. Null if unchanged since last heartbeat." }, - { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local standby tasks for this consumer. Null if unchanged since last heartbeat." }, - { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local warming up tasks for this consumer. Null if unchanged since last heartbeat." }, // IQ-related information @@ -89,7 +89,7 @@ { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "partitions" } ]}, - { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "TaskIds", "versions": "0+", "fields": [ // We are using strings here, to avoid fixing a structure on task IDs which will // allow introducing stable naming for subtopologies in the future. { "name": "Subtopology", "type": "string", "versions": "0+", diff --git a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json index 27d15cc66ec..847bb9bb215 100644 --- a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json +++ b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json @@ -20,16 +20,16 @@ "about": "The members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID." }, - { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", + { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "about": "Local assignment for this consumer." }, - { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", + { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "about": "Local standby tasks for this consumer." }, - { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", + { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "about": "Local warming up tasks for this consumer." } ]} ], "commonStructs": [ - { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "Subtopology", "type": "string", "versions": "0+", "about": "subtopology ID" }, { "name": "Partitions", "type": "[]int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json index c917931a568..1d9622caa9e 100644 --- a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json +++ b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json @@ -39,11 +39,11 @@ { "name": "TopologyHash", "type": "bytes", "versions": "0+", "about": "The hash of the topology." }, - { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", + { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "about": "Target active tasks for this consumer." }, - { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", + { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", "about": "Target standby tasks for this consumer." }, - { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", + { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", "about": "Target warming up tasks for this consumer. " }, { "name": "ProcessId", "type": "uuid", "versions": "0+", @@ -55,8 +55,10 @@ { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Cumulative offsets for assigned and dormant standby tasks. Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since last heartbeat.", "fields": [ - { "name": "TaskId", "type": "TaskId", "versions": "0+", - "about": "The task ID" }, + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "A unique identifier of the subtopology." }, + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition of the input topics." }, { "name": "Offset", "type": "int64", "versions": "0+", "about": "The cumulative offset for the task" } ] @@ -78,7 +80,7 @@ ]} ], "commonStructs": [ - { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "Subtopology", "type": "string", "versions": "0+", "about": "subtopology ID" }, { "name": "Partitions", "type": "[]int32", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java index 17b99c9c72f..37e89bfdd69 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java @@ -292,11 +292,11 @@ class StreamsHeartbeatRequestManagerTest { .setThrottleTimeMs(TEST_THROTTLE_TIME_MS) .setHeartbeatIntervalMs(1000) .setActiveTasks(Collections.singletonList( - new StreamsHeartbeatResponseData.TaskId().setSubtopology("0").setPartitions(Collections.singletonList(0)))) + new StreamsHeartbeatResponseData.TaskIds().setSubtopology("0").setPartitions(Collections.singletonList(0)))) .setStandbyTasks(Collections.singletonList( - new StreamsHeartbeatResponseData.TaskId().setSubtopology("1").setPartitions(Collections.singletonList(1)))) + new StreamsHeartbeatResponseData.TaskIds().setSubtopology("1").setPartitions(Collections.singletonList(1)))) .setWarmupTasks(Collections.singletonList( - new StreamsHeartbeatResponseData.TaskId().setSubtopology("2").setPartitions(Collections.singletonList(2)))); + new StreamsHeartbeatResponseData.TaskIds().setSubtopology("2").setPartitions(Collections.singletonList(2)))); mockResponse(data); diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json index d79e58cba75..0cbe1ba1e4e 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json @@ -26,17 +26,17 @@ "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, { "name": "State", "versions": "0+", "type": "int8", "about": "The member state. See StreamsGroupMember.MemberState for the possible values." }, - { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskId", + { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned active tasks for this streams client." }, - { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskId", + { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned standby tasks for this streams client." }, - { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskId", + { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned warming up tasks for this streams client." }, { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": "[]TaskId", "about": "The active tasks that must be revoked by this member." } ], "commonStructs": [ - { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "Subtopology", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json index 38526e4381c..06cb84ab18b 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json @@ -20,15 +20,15 @@ "validVersions": "0", "flexibleVersions": "0+", "fields": [ - { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskId", + { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned active tasks for this streams client." }, - { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskId", + { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned standby tasks for this streams client." }, - { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskId", + { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds", "about": "Currently assigned warming up tasks for this streams client." } ], "commonStructs": [ - { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "TaskIds", "versions": "0+", "fields": [ { "name": "Subtopology", "type": "string", "versions": "0+", "about": "The subtopology identifier." }, { "name": "Partitions", "type": "[]int32", "versions": "0+",
