This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new 9dd4f438b67 MINOR: add endpoint to sync RPCs with KIP (#18035)
9dd4f438b67 is described below
commit 9dd4f438b67f3ffef31da7e2f01f48682ae8e29b
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Dec 4 16:06:22 2024 +0100
MINOR: add endpoint to sync RPCs with KIP (#18035)
---
.../common/message/StreamsGroupDescribeResponse.json | 20 +++++++++++++++-----
.../common/message/StreamsGroupHeartbeatRequest.json | 10 +++++-----
.../group/streams/StreamsGroupMember.java | 11 ++++++++---
.../message/StreamsGroupMemberMetadataValue.json | 4 ++--
.../group/streams/StreamsGroupMemberTest.java | 8 ++++++--
5 files changed, 36 insertions(+), 17 deletions(-)
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 61dd4a38dae..dbd582e1f41 100644
---
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -46,13 +46,13 @@
{ "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
"about": "The assignment epoch." },
- { "name": "Topology", "type": "Topology", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The topology metadata currently initialized for the
streams application. Can be null in case of a describe error.",
+ { "name": "Topology", "type": "Topology", "versions": "0+",
+ "about": "The topology metadata currently initialized for the
streams application.",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The epoch of the currently initialized topology for
this group." },
{ "name": "Subtopologies", "type": "[]Subtopology", "versions":
"0+", "nullableVersions": "0+", "default": "null",
- "about": "The subtopologies of the streams application. This
contains the configured subtopologies, where the number of partitions are set
and any regular expressions are resolved to actual topics. Null if the group is
uninitialized, source topics are missing or inconsistent.",
+ "about": "The subtopologies of the streams application. This
contains the configured subtopologies, where the number of partitions are set
and any regular expressions are resolved to actual topics. Null if the group is
uninitialized, source topics are missing or incorrectly partitioned.",
"fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "String to uniquely identify the subtopology." },
@@ -88,6 +88,8 @@
{ "name": "ProcessId", "type": "string", "versions": "0+",
"about": "Identity of the streams instance that may have
multiple clients. " },
+ { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
"nullableVersions": "0+", "default": "null",
+ "about": "User-defined endpoint for Interactive Queries. Null if
not defined for this client." },
{ "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
"about": "Used for rack-aware assignment algorithm." },
{ "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+",
@@ -98,7 +100,9 @@
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." },
{ "name": "TargetAssignment", "type": "Assignment", "versions":
"0+",
- "about": "The target assignment." }
+ "about": "The target assignment." },
+ { "name": "IsClassic", "type": "bool", "versions": "0+",
+ "about": "True for classic members that have not been upgraded
yet." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+",
"default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for
this group." }
@@ -106,6 +110,12 @@
}
],
"commonStructs": [
+ { "name": "Endpoint", "versions": "0+", "fields": [
+ { "name": "Host", "type": "string", "versions": "0+",
+ "about": "host of the endpoint" },
+ { "name": "Port", "type": "uint16", "versions": "0+",
+ "about": "port of the endpoint" }
+ ]},
{ "name": "TaskOffset", "versions": "0+", "fields": [
{ "name": "SubtopologyId", "type": "string", "versions": "0+",
"about": "The subtopology identifier." },
@@ -154,4 +164,4 @@
}
]}
]
-}
+}
\ No newline at end of file
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index d63ee0f8fd9..b031d2da07d 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -30,15 +30,15 @@
{ "name": "InstanceId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "null if not provided or if it didn't change since the last
heartbeat; the instance ID for static membership otherwise." },
{ "name": "RackId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "null if not provided or if it didn't change since the last
heartbeat; the rack ID of consumer otherwise." },
+ "about": "null if not provided or if it didn't change since the last
heartbeat; the rack ID of the member otherwise." },
{ "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+",
"default": -1,
- "about": "-1 if it didn't change since the last heartbeat; the maximum
time in milliseconds that the coordinator will wait on the member to revoke its
partitions otherwise." },
+ "about": "-1 if it didn't change since the last heartbeat; the maximum
time in milliseconds that the coordinator will wait on the member to revoke its
tasks otherwise." },
{ "name": "Topology", "type": "Topology", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "The topology metadata of the streams application. Used to
initialize the topology of the group and to check if the topology corresponds
to the topology initialized for the group. Only sent when memberEpoch = 0, must
be non-empty. Null otherwise.",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
- "about": "The epoch of the topology. Used to check if the topology
corresponds to the topology initialized on the brokers. Must be non-zero." },
+ "about": "The epoch of the topology. Used to check if the topology
corresponds to the topology initialized on the brokers." },
{ "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+",
"about": "The sub-topologies of the streams application.",
"fields": [
@@ -78,7 +78,7 @@
{ "name": "ProcessId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "Identity of the streams instance that may have multiple
consumers. Null if unchanged since last heartbeat." },
{ "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "User-defined endpoint for Interactive Queries. Null if
unchanged since last heartbeat." },
+ "about": "User-defined endpoint for Interactive Queries. Null if
unchanged since last heartbeat, or if not defined on the client." },
{ "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "Used for rack-aware assignment algorithm. Null if unchanged
since last heartbeat." },
@@ -129,4 +129,4 @@
"about": "The partitions of the input topics processed by this
member." }
]}
]
-}
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 144d1198063..b73f1ce5459 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -621,9 +621,14 @@ public class StreamsGroupMember {
.setValue(entry.getValue())
).collect(Collectors.toList()))
.setProcessId(processId)
- .setTopologyEpoch(topologyEpoch);
- // TODO: TaskOffset and TaskEndOffset are missing.
-
+ .setTopologyEpoch(topologyEpoch)
+ .setUserEndpoint(
+ userEndpoint == null ? null :
+ new StreamsGroupDescribeResponseData.Endpoint()
+ .setHost(userEndpoint.host())
+ .setPort(userEndpoint.port())
+ );
+ // TODO: TaskOffset, TaskEndOffset, IsClassic are to be implemented.
}
private static List<StreamsGroupDescribeResponseData.TaskIds>
taskIdsFromMap(
diff --git
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
index 07ef8d4c252..0725ccb1cbc 100644
---
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
@@ -21,7 +21,7 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "InstanceId", "versions": "0+", "nullableVersions": "0+",
"type": "string",
- "about": "The (optional) instance id." },
+ "about": "The (optional) instance ID for static membership." },
{ "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type":
"string",
"about": "The (optional) rack id." },
{ "name": "ClientId", "versions": "0+", "type": "string",
@@ -36,7 +36,7 @@
{ "name": "ProcessId", "type": "string", "versions": "0+",
"about": "Identity of the streams instance that may have multiple
consumers." },
- { "name": "UserEndpoint", "type": "Endpoint", "nullableVersions": "0+",
"versions": "0+",
+ { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "User-defined endpoint for running interactive queries on this
instance." },
{ "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
"about": "Used for rack-aware assignment algorithm." }
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 5af3a225577..0120e6f11ec 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -340,6 +340,9 @@ public class StreamsGroupMemberTest {
.setAssignedWarmupTasks(
mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks3)))
)
+ .setUserEndpoint(
+ new
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)
+ )
.build();
StreamsGroupDescribeResponseData.Member actual =
member.asStreamsGroupDescribeMember(targetAssignment);
@@ -376,8 +379,9 @@ public class StreamsGroupMemberTest {
.setWarmupTasks(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskIds()
.setSubtopologyId(subTopology3)
.setPartitions(assignedTasks1)))
- );
- // TODO: Add TaskOffsets
+ )
+ .setUserEndpoint(new
StreamsGroupDescribeResponseData.Endpoint().setHost("host").setPort(9090));
+ // TODO: TaskOffset, TaskEndOffset, IsClassic are to be implemented.
assertEquals(expected, actual);
}