This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7cd85abe2db709bec362e98ae40fe056006e7e25
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu May 16 22:58:44 2024 +0200

    Resolve merge conflict from 11/25 trunk rebase - Define RPCs to perform 
client-side assignment
    
    Even if we do not implement client-side assignment in the POC, let's define 
all public interfaces that will be part of the KIP in the branch, so that we 
have a single source of truth.
    
    Also updates the RPCs following discussions with responsive
    
     * Adds "userdata" for client-side assignment to the RPCs, as requested by 
Sophie.
     * Use task offsets instead of task lags
     * Allow the task offsets to be updated for 4 reasons
    
    See https://github.com/lucasbru/kafka/pull/8
---
 .../common/errors/StreamsInvalidAssignment.java    | 23 ++++++
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  4 +-
 .../org/apache/kafka/common/protocol/Errors.java   |  1 +
 .../requests/StreamsInstallAssignmentRequest.java  | 63 ++++++++++++++
 .../requests/StreamsInstallAssignmentResponse.java | 62 ++++++++++++++
 .../requests/StreamsPrepareAssignmentRequest.java  | 63 ++++++++++++++
 .../requests/StreamsPrepareAssignmentResponse.java | 62 ++++++++++++++
 .../common/message/OffsetCommitRequest.json        |  2 +-
 .../common/message/OffsetFetchRequest.json         |  4 +-
 .../common/message/StreamsHeartbeatRequest.json    | 17 ++--
 .../message/StreamsInstallAssignmentRequest.json   | 39 +++++++++
 .../message/StreamsInstallAssignmentResponse.json  | 26 ++++++
 .../message/StreamsPrepareAssignmentRequest.json   | 16 ++++
 .../message/StreamsPrepareAssignmentResponse.json  | 96 ++++++++++++++++++++++
 14 files changed, 466 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java
new file mode 100644
index 00000000000..dd6509f90f0
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class StreamsInvalidAssignment extends ApiException {
+    public StreamsInvalidAssignment(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index fa9600d8536..7924677af3f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -132,7 +132,9 @@ public enum ApiKeys {
     DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
     
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, 
true),
     STREAMS_HEARTBEAT(ApiMessageType.STREAMS_HEARTBEAT),
-    STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE);
+    STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE),
+    STREAMS_INSTALL_ASSIGNMENT(ApiMessageType.STREAMS_INSTALL_ASSIGNMENT),
+    STREAMS_PREPARE_ASSIGNMENT(ApiMessageType.STREAMS_PREPARE_ASSIGNMENT);
     
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> 
APIS_BY_LISTENER =
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 6246c4b4d14..9790d87863a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.StreamsInconsistentTopologyException;
+import org.apache.kafka.common.errors.StreamsInvalidAssignment;
 import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
 import org.apache.kafka.common.errors.StreamsMissingInternalTopicsException;
 import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
new file mode 100644
index 00000000000..d793101c537
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
@@ -0,0 +1,63 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData;
+import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
+import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class StreamsInstallAssignmentRequest extends AbstractRequest  {
+
+    public static class Builder extends 
AbstractRequest.Builder<StreamsInstallAssignmentRequest> {
+        private final StreamsInstallAssignmentRequestData data;
+
+        public Builder(StreamsInstallAssignmentRequestData data) {
+            this(data, false);
+        }
+
+        public Builder(StreamsInstallAssignmentRequestData data, boolean 
enableUnstableLastVersion) {
+            super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, 
enableUnstableLastVersion);
+            this.data = data;
+        }
+
+        @Override
+        public StreamsInstallAssignmentRequest build(short version) {
+            return new StreamsInstallAssignmentRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final StreamsInstallAssignmentRequestData data;
+
+    public StreamsInstallAssignmentRequest(StreamsInstallAssignmentRequestData 
data, short version) {
+        super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, version);
+        this.data = data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new StreamsInstallAssignmentResponse(
+            new StreamsInstallAssignmentResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(Errors.forException(e).code())
+        );
+    }
+
+    @Override
+    public StreamsInstallAssignmentRequestData data() {
+        return data;
+    }
+
+    public static StreamsInstallAssignmentRequest parse(ByteBuffer buffer, 
short version) {
+        return new StreamsInstallAssignmentRequest(new 
StreamsInstallAssignmentRequestData(
+            new ByteBufferAccessor(buffer), version), version);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
new file mode 100644
index 00000000000..3a09420f796
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
@@ -0,0 +1,62 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
+import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#INVALID_GROUP_ID}
+ * - {@link Errors#GROUP_ID_NOT_FOUND}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
+ * - {@link Errors#STALE_MEMBER_EPOCH}
+ * - {@link Errors#STREAMS_INVALID_ASSIGNMENT}
+ */
+public class StreamsInstallAssignmentResponse extends AbstractResponse {
+
+    private final StreamsInstallAssignmentResponseData data;
+
+    public 
StreamsInstallAssignmentResponse(StreamsInstallAssignmentResponseData data) {
+        super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT);
+        this.data = data;
+    }
+
+    @Override
+    public StreamsInstallAssignmentResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    public static StreamsInstallAssignmentResponse parse(ByteBuffer buffer, 
short version) {
+        return new StreamsInstallAssignmentResponse(new 
StreamsInstallAssignmentResponseData(
+            new ByteBufferAccessor(buffer), version));
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
new file mode 100644
index 00000000000..3c83273bf4f
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
@@ -0,0 +1,63 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData;
+import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
+import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class StreamsPrepareAssignmentRequest extends AbstractRequest  {
+
+    public static class Builder extends 
AbstractRequest.Builder<StreamsPrepareAssignmentRequest> {
+        private final StreamsPrepareAssignmentRequestData data;
+
+        public Builder(StreamsPrepareAssignmentRequestData data) {
+            this(data, false);
+        }
+
+        public Builder(StreamsPrepareAssignmentRequestData data, boolean 
enableUnstableLastVersion) {
+            super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, 
enableUnstableLastVersion);
+            this.data = data;
+        }
+
+        @Override
+        public StreamsPrepareAssignmentRequest build(short version) {
+            return new StreamsPrepareAssignmentRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final StreamsPrepareAssignmentRequestData data;
+
+    public StreamsPrepareAssignmentRequest(StreamsPrepareAssignmentRequestData 
data, short version) {
+        super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, version);
+        this.data = data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new StreamsPrepareAssignmentResponse(
+            new StreamsPrepareAssignmentResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(Errors.forException(e).code())
+        );
+    }
+
+    @Override
+    public StreamsPrepareAssignmentRequestData data() {
+        return data;
+    }
+
+    public static StreamsPrepareAssignmentRequest parse(ByteBuffer buffer, 
short version) {
+        return new StreamsPrepareAssignmentRequest(new 
StreamsPrepareAssignmentRequestData(
+            new ByteBufferAccessor(buffer), version), version);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
new file mode 100644
index 00000000000..99997022783
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
@@ -0,0 +1,62 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
+import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
+import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#INVALID_GROUP_ID}
+ * - {@link Errors#GROUP_ID_NOT_FOUND}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
+ * - {@link Errors#STALE_MEMBER_EPOCH}
+ */
+public class StreamsPrepareAssignmentResponse extends AbstractResponse {
+
+    private final StreamsPrepareAssignmentResponseData data;
+
+    public 
StreamsPrepareAssignmentResponse(StreamsPrepareAssignmentResponseData data) {
+        super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT);
+        this.data = data;
+    }
+
+    @Override
+    public StreamsPrepareAssignmentResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    public static StreamsPrepareAssignmentResponse parse(ByteBuffer buffer, 
short version) {
+        return new StreamsPrepareAssignmentResponse(new 
StreamsPrepareAssignmentResponseData(
+            new ByteBufferAccessor(buffer), version));
+    }
+    
+}
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json 
b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index 5b3029a5301..3cb15998420 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -41,7 +41,7 @@
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The unique group identifier." },
     { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", 
"default": "-1", "ignorable": true,
-      "about": "The generation of the group if using the classic group 
protocol or the member epoch if using the consumer protocol." },
+      "about": "The generation of the group if using the classic group 
protocol or the member epoch if using the consumer or streams protocol." },
     { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": 
true,
       "about": "The member ID assigned by the group coordinator." },
     { "name": "GroupInstanceId", "type": "string", "versions": "7+",
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json 
b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index 82ac8065413..f154c8db448 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -54,9 +54,9 @@
       { "name": "GroupId", "type": "string", "versions": "8+", "entityType": 
"groupId",
         "about": "The group ID."},
       { "name": "MemberId", "type": "string", "versions": "9+", 
"nullableVersions": "9+", "default": "null", "ignorable": true,
-        "about": "The member id" },
+        "about": "The member ID assigned by the group coordinator if using the 
new consumer or streams protocol (KIP-848 / KIP-XXX)." },
       { "name": "MemberEpoch", "type": "int32", "versions": "9+", "default": 
"-1", "ignorable": true,
-        "about": "The member epoch if using the new consumer protocol 
(KIP-848)." },
+        "about": "The member epoch if using the new consumer or streams 
protocol (KIP-848 / KIP-XXX)." },
       { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": 
"8+", "nullableVersions": "8+",
         "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
         { "name": "Name", "type": "string", "versions": "8+", "entityType": 
"topicName",
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
index e21762e7e10..c97d0b912ff 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
@@ -60,20 +60,21 @@
     { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Used for rack-aware assignment algorithm. Null if unchanged 
since last heartbeat." },
 
-    { "name": "TaskLagUpdateReason", "type":  "int8", "versions": "0+", 
"default": 0,
-      "about": "Reason for updating task lag. 0 if no update, 1 if task lag is 
updated due to warm up being ready, 2 if warm up failed" },
-    { "name": "TaskLag", "type": "[]TaskLag", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "Cumulative lags for standby tasks. Only updatable when 
TaskLagUpdateReason is not zero. Null if unchanged since last heartbeat.",
+    { "name": "TaskOffsetUpdateReason", "type":  "int8", "versions": "0+", 
"default": 0,
+      "about": "Reason for updating task offset. 0 if no update, 1 if task 
catches up with acceptable.recovery.lag, 2 if task falls behind 
acceptable.recovery.lag, 3 if a standby task is added or removed, 4 if triggerd 
by a time-interval" },
+    { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Cumulative offsets for assigned and dormant standby tasks. 
Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since 
last heartbeat.",
       "fields": [
         { "name": "TaskId", "type": "TaskId", "versions": "0+",
           "about": "The task ID" },
-        { "name": "Lag", "type": "int64", "versions": "0+",
-          "about": "The cumulative lag for the task" }
+        { "name": "Offset", "type": "int64", "versions": "0+",
+          "about": "The cumulative offset for the task" }
       ]
     },
+    { "name": "UserData", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Opaque user data to be passed to the client-side assignor. 
Null if unchanged since last heartbeat." },
     { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "Generic array of assignment configuration strings. Null if 
unchanged since last heartbeat."
-    },
+      "about": "Generic array of assignment configuration strings. Null if 
unchanged since last heartbeat." },
     { "name": "ShutdownApplication", "type": "bool", "versions": "0+", 
"default": false,
       "about": "Whether all Streams clients in the group should shut down." }
   ],
diff --git 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
new file mode 100644
index 00000000000..27d15cc66ec
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json
@@ -0,0 +1,39 @@
+{
+  "apiKey": 78,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "StreamsInstallAssignmentRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      "about": "The group identifier." },
+    { "name": "MemberId", "type": "string", "versions": "0+",
+      "about": "The member id assigned by the group coordinator." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The member epoch." },
+    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
+      "about": "The group epoch." },
+    { "name": "Error", "type": "int8", "versions": "0+",
+      "about": "The assignment error; or zero if the assignment is 
successful." },
+    { "name": "Members", "type": "[]Member", "versions": "0+",
+      "about": "The members.", "fields": [
+      { "name": "MemberId", "type": "string", "versions": "0+",
+        "about": "The member ID." },
+      { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+",
+        "about": "Local assignment for this consumer." },
+      { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+",
+        "about": "Local standby tasks for this consumer." },
+      { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+",
+        "about": "Local warming up tasks for this consumer." }
+    ]}
+  ],
+  "commonStructs": [
+    { "name": "TaskId", "versions": "0+", "fields": [
+      { "name": "Subtopology", "type": "string", "versions": "0+",
+        "about": "subtopology ID" },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "partitions" }
+    ]}
+  ]
+}
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json
 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json
new file mode 100644
index 00000000000..b1fe8e00644
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json
@@ -0,0 +1,26 @@
+{
+  "apiKey": 78,
+  "type": "response",
+  "name": "StreamsInstallAssignmentResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - INVALID_GROUP_ID
+  // - GROUP_ID_NOT_FOUND
+  // - UNKNOWN_MEMBER_ID
+  // - STALE_MEMBER_EPOCH
+  // - STREAMS_INVALID_ASSIGNMENT
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error" },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The top-level error message, or null if there was no error." }
+  ]
+}
diff --git 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json
 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json
new file mode 100644
index 00000000000..12b9098940c
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json
@@ -0,0 +1,16 @@
+{
+  "apiKey": 79,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "StreamsPrepareAssignmentRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      "about": "The group identifier." },
+    { "name": "MemberId", "type": "string", "versions": "0+",
+      "about": "The member id assigned by the group coordinator." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The member epoch." }
+  ]
+}
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
new file mode 100644
index 00000000000..c917931a568
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json
@@ -0,0 +1,96 @@
+{
+  "apiKey": 79,
+  "type": "response",
+  "name": "StreamsPrepareAssignmentResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - INVALID_GROUP_ID
+  // - GROUP_ID_NOT_FOUND
+  // - UNKNOWN_MEMBER_ID
+  // - STALE_MEMBER_EPOCH
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error" },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The top-level error message, or null if there was no error." },
+    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
+      "about": "The group epoch." },
+    { "name": "AssignorName", "type": "string", "versions": "0+",
+      "about": "The selected assignor." },
+    { "name": "Members", "type": "[]Member", "versions": "0+",
+      "about": "The members.", "fields": [
+      { "name": "MemberId", "type": "string", "versions": "0+",
+        "about": "The member ID." },
+      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+        "about": "The member epoch." },
+      { "name": "InstanceId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+        "about": "The member instance ID." },
+      { "name": "RackId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+        "about": "The rack ID." },
+
+      { "name": "TopologyHash", "type": "bytes", "versions": "0+",
+        "about": "The hash of the topology." },
+
+      { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+",
+        "about": "Target active tasks for this consumer." },
+      { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+",
+        "about": "Target standby tasks for this consumer." },
+      { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+",
+        "about": "Target warming up tasks for this consumer. " },
+
+      { "name": "ProcessId", "type": "uuid", "versions": "0+",
+        "about": "Identity of the streams instance that may have multiple 
consumers. " },
+      { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
+        "about": "Used for rack-aware assignment algorithm." },
+      { "name": "TaskOffsetUpdateReason", "type":  "int8", "versions": "0+", 
"default": 0,
+        "about": "Reason for updating task offset. 0 if no update, 1 if task 
catches up with acceptable.recovery.lag, 2 if task falls behind 
acceptable.recovery.lag, 3 if a standby task is added or removed, 4 if triggerd 
by a time-interval" },
+      { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+        "about": "Cumulative offsets for assigned and dormant standby tasks. 
Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since 
last heartbeat.",
+        "fields": [
+          { "name": "TaskId", "type": "TaskId", "versions": "0+",
+            "about": "The task ID" },
+          { "name": "Offset", "type": "int64", "versions": "0+",
+            "about": "The cumulative offset for the task" }
+        ]
+      },
+      { "name": "UserData", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+        "about": "Opaque user data to be passed to the client-side assignor. 
Null if unchanged since last heartbeat." },
+      { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+        "about": "Generic array of assignment configuration strings."}
+    ]},
+    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
+      "about": "The topic-partition metadata.",
+      "fields": [
+        { "name": "TopicId", "type": "uuid", "versions": "0+",
+          "about": "The topic ID." },
+        { "name": "TopicName", "type": "string", "versions": "0+",
+          "about": "The topic name." },
+        { "name": "NumPartitions", "type": "int32", "versions": "0+",
+          "about": "The number of partitions." }
+      ]}
+  ],
+  "commonStructs": [
+    { "name": "TaskId", "versions": "0+", "fields": [
+      { "name": "Subtopology", "type": "string", "versions": "0+",
+        "about": "subtopology ID" },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "partitions" }
+    ]},
+    { "name": "KeyValue", "versions": "0+",
+      "fields": [
+        { "name": "Key", "type": "string", "versions": "0+",
+          "about": "key of the config" },
+        { "name": "Value", "type": "string", "versions": "0+",
+          "about": "value of the config" }
+      ]
+    }
+  ]
+}
\ No newline at end of file

Reply via email to