This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5eb61c087cae561f115098b2c6fe0738c41853b2 Author: Lucas Brutschy <[email protected]> AuthorDate: Wed Apr 17 17:28:38 2024 +0200 Very initial (compiling) streams RPCs. Basic initial RPCS. See https://github.com/lucasbru/kafka/pull/6 --- .../common/message/StreamsHeartbeatRequest.json | 87 ++++++++++++++++++++ .../common/message/StreamsHeartbeatResponse.json | 96 ++++++++++++++++++++++ 2 files changed, 183 insertions(+) diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json new file mode 100644 index 00000000000..a5f52e262f2 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 76, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "StreamsHeartbeatRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." }, + { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, + "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, + + // If the topology hash does not match the server's topology hash, return INVALID_TOPOLOGY and send ShouldInitializeTopology to one consumer + { "name": "TopologyHash", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": null, + "about": "The hash of the topology. Only sent when memberEpoch = 0. Null otherwise. Used to check if topology corresponds to server-topology. " }, + + // The coordinator knows which partitions to fetch from given this information + { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Local assignment for this consumer. Null if unchanged since last heartbeat." }, + { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Local standby tasks for this consumer. Null if unchanged since last heartbeat." }, + { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Local warming up tasks for this consumer. Null if unchanged since last heartbeat." }, + + { "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Identity of the streams instance that may have multiple consumers. Null if unchanged since last heartbeat." }, + { "name": "UserEndPoint", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "End-point a user can use for IQ. Null if unchanged since last heartbeat." }, + { "name": "ClientTags", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." }, + + { "name": "TaskLagUpdateReason", "type": "int8", "versions": "0+", "default": 0, + "about": "Reason for updating task lag. 0 if no update, 1 if task lag is updated due to warm up being ready, 2 if warm up failed" }, + { "name": "TaskLag", "type": "[]TaskLag", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Cumulative lags for standby tasks. Only updatable when TaskLagUpdateReason is not zero. Null if unchanged since last heartbeat.", + "fields": [ + { "name": "TaskId", "type": "TaskId", "versions": "0+", + "about": "The task ID" }, + { "name": "Lag", "type": "int64", "versions": "0+", + "about": "The cumulative lag for the task" } + ] + }, + { "name": "AssignmentConfigs", "type": "[]AssignmentConfig", "versions": "0+","nullableVersions": "0+", "default": "null", + "about": "Generic array of assignment configuration strings. Null if unchanged since last heartbeat.", + "fields": [ + { "name": "Key", "type": "string", "versions": "0+", + "about": "key of the config" }, + { "name": "Value", "type": "string", "versions": "0+", + "about": "value of the config" } + ] + } + ], + "commonStructs": [ + { "name": "TaskId", "versions": "0+", "fields": [ + // We are using strings here, to avoid fixing a structure on task IDs which will + // allow introducing stable naming for subtopologies in the future. + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "subtopology ID" }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "partitions" } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json new file mode 100644 index 00000000000..0ab1b0f0ea0 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 76, + "type": "response", + "name": "StreamsHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNKNOWN_MEMBER_ID (version 0+) + // - FENCED_MEMBER_EPOCH (version 0+) + // - UNSUPPORTED_ASSIGNOR (version 0+) + // - UNRELEASED_INSTANCE_ID (version 0+) + // - GROUP_MAX_SIZE_REACHED (version 0+) + + // Streams-specific errors: + // - INCONSISTENT_TOPOLOGY (version 0+) + // - MISSING_SOURCE_TOPICS (version 0+) + // - MISSING_INTERNAL_TOPICS (version 0+) + "fields": [ + // Same as consumer group heart beat + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, + { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+", + "about": "The heartbeat interval in milliseconds." }, + + // Assigning specific roles to the consumer + { "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+", "default": false, + "about": "true if this streams application should compute the next assignment." }, + { "name": "ShouldInitializeTopology", "type": "bool", "versions": "0+", "default": false, + "about": "true if this streams application should initialize the topology on the broker" }, + + // The streams app knows which partitions to fetch from given this information + { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Local assignment for this consumer. Null if unchanged since last heartbeat." }, + { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Local standby tasks for this consumer. Null if unchanged since last heartbeat." }, + { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Local warming up tasks for this consumer. Null if unchanged since last heartbeat." }, + + // IQ-related information + { "name": "PartitionsByHost", "type": "[]HostInfo", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Global assignment information used for IQ. Null if unchanged since last heartbeat." , + "fields": [ + { "name": "Host", "type": "string", "versions": "0+", + "about": "Hostname of the node" }, + { "name": "Port", "type": "int32", "versions": "0+", + "about": "Port of the node" }, + { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+", + "about": "All partitions available on the node" } + ] + } + ], + "commonStructs": [ + { "name": "TopicPartition", "versions": "0+", "fields": [ + { "name": "Topic", "type": "string", "versions": "0+", + "about": "topic name" }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "partitions" } + ]}, + { "name": "TaskId", "versions": "0+", "fields": [ + // We are using strings here, to avoid fixing a structure on task IDs which will + // allow introducing stable naming for subtopologies in the future. + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "subtopology ID" }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "partitions" } + ]} + ] +}
