This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5eb61c087cae561f115098b2c6fe0738c41853b2
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Apr 17 17:28:38 2024 +0200

    Very initial (compiling) streams RPCs.
    
    Basic initial RPCS.
    
    See https://github.com/lucasbru/kafka/pull/6
---
 .../common/message/StreamsHeartbeatRequest.json    | 87 ++++++++++++++++++++
 .../common/message/StreamsHeartbeatResponse.json   | 96 ++++++++++++++++++++++
 2 files changed, 183 insertions(+)

diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
new file mode 100644
index 00000000000..a5f52e262f2
--- /dev/null
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 76,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "StreamsHeartbeatRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      "about": "The group identifier." },
+    { "name": "MemberId", "type": "string", "versions": "0+",
+      "about": "The member id generated by the coordinator. The member id must 
be kept during the entire lifetime of the member." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The current member epoch; 0 to join the group; -1 to leave the 
group; -2 to indicate that the static member will rejoin." },
+    { "name": "InstanceId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if not provided or if it didn't change since the last 
heartbeat; the instance Id otherwise." },
+    { "name": "RackId", "type": "string", "versions": "0+",  
"nullableVersions": "0+", "default": "null",
+      "about": "null if not provided or if it didn't change since the last 
heartbeat; the rack ID of consumer otherwise." },
+    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
+      "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
+
+    // If the topology hash does not match the server's topology hash, return 
INVALID_TOPOLOGY and send ShouldInitializeTopology to one consumer
+    { "name": "TopologyHash", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+", "default": null,
+      "about": "The hash of the topology. Only sent when memberEpoch = 0. Null 
otherwise. Used to check if topology corresponds to server-topology. " },
+
+    // The coordinator knows which partitions to fetch from given this 
information
+    { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Local assignment for this consumer. Null if unchanged since 
last heartbeat." },
+    { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Local standby tasks for this consumer. Null if unchanged since 
last heartbeat." },
+    { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Local warming up tasks for this consumer. Null if unchanged 
since last heartbeat." },
+
+    { "name": "ProcessId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Identity of the streams instance that may have multiple 
consumers. Null if unchanged since last heartbeat." },
+    { "name": "UserEndPoint", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "End-point a user can use for IQ. Null if unchanged since last 
heartbeat." },
+    { "name": "ClientTags", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Used for rack-aware assignment algorithm. Null if unchanged 
since last heartbeat." },
+
+    { "name": "TaskLagUpdateReason", "type":  "int8", "versions": "0+", 
"default": 0,
+      "about": "Reason for updating task lag. 0 if no update, 1 if task lag is 
updated due to warm up being ready, 2 if warm up failed" },
+    { "name": "TaskLag", "type": "[]TaskLag", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Cumulative lags for standby tasks. Only updatable when 
TaskLagUpdateReason is not zero. Null if unchanged since last heartbeat.",
+      "fields": [
+        { "name": "TaskId", "type": "TaskId", "versions": "0+",
+          "about": "The task ID" },
+        { "name": "Lag", "type": "int64", "versions": "0+",
+          "about": "The cumulative lag for the task" }
+      ]
+    },
+    { "name": "AssignmentConfigs", "type": "[]AssignmentConfig", "versions": 
"0+","nullableVersions": "0+", "default": "null",
+      "about": "Generic array of assignment configuration strings. Null if 
unchanged since last heartbeat.",
+      "fields": [
+        { "name": "Key", "type": "string", "versions": "0+",
+          "about": "key of the config" },
+        { "name": "Value", "type": "string", "versions": "0+",
+          "about": "value of the config" }
+      ]
+    }
+  ],
+  "commonStructs": [
+    { "name": "TaskId", "versions": "0+", "fields": [
+      // We are using strings here, to avoid fixing a structure on task IDs 
which will
+      // allow introducing stable naming for subtopologies in the future.
+      { "name": "Subtopology", "type": "string", "versions": "0+",
+        "about": "subtopology ID" },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "partitions" }
+    ]}
+  ]
+}
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
new file mode 100644
index 00000000000..0ab1b0f0ea0
--- /dev/null
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 76,
+  "type": "response",
+  "name": "StreamsHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED (version 0+)
+  // - NOT_COORDINATOR (version 0+)
+  // - COORDINATOR_NOT_AVAILABLE (version 0+)
+  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+  // - INVALID_REQUEST (version 0+)
+  // - UNKNOWN_MEMBER_ID (version 0+)
+  // - FENCED_MEMBER_EPOCH (version 0+)
+  // - UNSUPPORTED_ASSIGNOR (version 0+)
+  // - UNRELEASED_INSTANCE_ID (version 0+)
+  // - GROUP_MAX_SIZE_REACHED (version 0+)
+
+  // Streams-specific errors:
+  // - INCONSISTENT_TOPOLOGY (version 0+)
+  // - MISSING_SOURCE_TOPICS (version 0+)
+  // - MISSING_INTERNAL_TOPICS (version 0+)
+  "fields": [
+    // Same as consumer group heart beat
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error" },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The top-level error message, or null if there was no error." },
+    { "name": "MemberId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The member epoch." },
+    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
+      "about": "The heartbeat interval in milliseconds." },
+
+    // Assigning specific roles to the consumer
+    { "name": "ShouldComputeAssignment", "type":  "bool", "versions":  "0+", 
"default": false,
+      "about": "true if this streams application should compute the next 
assignment." },
+    { "name": "ShouldInitializeTopology", "type":  "bool", "versions":  "0+", 
"default": false,
+      "about": "true if this streams application should initialize the 
topology on the broker" },
+
+    // The streams app knows which partitions to fetch from given this 
information
+    { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Local assignment for this consumer. Null if unchanged since 
last heartbeat." },
+    { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Local standby tasks for this consumer. Null if unchanged since 
last heartbeat." },
+    { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Local warming up tasks for this consumer. Null if unchanged 
since last heartbeat." },
+
+    // IQ-related information
+    { "name": "PartitionsByHost", "type": "[]HostInfo", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Global assignment information used for IQ. Null if unchanged 
since last heartbeat." ,
+      "fields": [
+        { "name": "Host", "type": "string", "versions": "0+",
+          "about": "Hostname of the node" },
+        { "name": "Port", "type": "int32", "versions": "0+",
+          "about": "Port of the node" },
+        { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
+          "about": "All partitions available on the node" }
+      ]
+    }
+  ],
+  "commonStructs": [
+    { "name": "TopicPartition", "versions": "0+", "fields": [
+      { "name": "Topic", "type": "string", "versions": "0+",
+        "about": "topic name" },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "partitions" }
+    ]},
+    { "name": "TaskId", "versions": "0+", "fields": [
+      // We are using strings here, to avoid fixing a structure on task IDs 
which will
+      // allow introducing stable naming for subtopologies in the future.
+      { "name": "Subtopology", "type": "string", "versions": "0+",
+        "about": "subtopology ID" },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "partitions" }
+    ]}
+  ]
+}

Reply via email to