This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7cd85abe2db709bec362e98ae40fe056006e7e25 Author: Lucas Brutschy <[email protected]> AuthorDate: Thu May 16 22:58:44 2024 +0200 Resolve merge conflict from 11/25 trunk rebase - Define RPCs to perform client-side assignment Even if we do not implement client-side assignment in the POC, let's define all public interfaces that will be part of the KIP in the branch, so that we have a single source of truth. Also updates the RPCs following discussions with responsive * Adds "userdata" for client-side assignment to the RPCs, as requested by Sophie. * Use task offsets instead of task lags * Allow the task offsets to be updated for 4 reasons See https://github.com/lucasbru/kafka/pull/8 --- .../common/errors/StreamsInvalidAssignment.java | 23 ++++++ .../org/apache/kafka/common/protocol/ApiKeys.java | 4 +- .../org/apache/kafka/common/protocol/Errors.java | 1 + .../requests/StreamsInstallAssignmentRequest.java | 63 ++++++++++++++ .../requests/StreamsInstallAssignmentResponse.java | 62 ++++++++++++++ .../requests/StreamsPrepareAssignmentRequest.java | 63 ++++++++++++++ .../requests/StreamsPrepareAssignmentResponse.java | 62 ++++++++++++++ .../common/message/OffsetCommitRequest.json | 2 +- .../common/message/OffsetFetchRequest.json | 4 +- .../common/message/StreamsHeartbeatRequest.json | 17 ++-- .../message/StreamsInstallAssignmentRequest.json | 39 +++++++++ .../message/StreamsInstallAssignmentResponse.json | 26 ++++++ .../message/StreamsPrepareAssignmentRequest.json | 16 ++++ .../message/StreamsPrepareAssignmentResponse.json | 96 ++++++++++++++++++++++ 14 files changed, 466 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java new file mode 100644 index 00000000000..dd6509f90f0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsInvalidAssignment extends ApiException { + public StreamsInvalidAssignment(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index fa9600d8536..7924677af3f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -132,7 +132,9 @@ public enum ApiKeys { DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true), READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), STREAMS_HEARTBEAT(ApiMessageType.STREAMS_HEARTBEAT), - STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE); + STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE), + STREAMS_INSTALL_ASSIGNMENT(ApiMessageType.STREAMS_INSTALL_ASSIGNMENT), + STREAMS_PREPARE_ASSIGNMENT(ApiMessageType.STREAMS_PREPARE_ASSIGNMENT); private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER = diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 6246c4b4d14..9790d87863a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.StreamsInconsistentTopologyException; +import org.apache.kafka.common.errors.StreamsInvalidAssignment; import org.apache.kafka.common.errors.StreamsInvalidTopologyException; import org.apache.kafka.common.errors.StreamsMissingInternalTopicsException; import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java new file mode 100644 index 00000000000..d793101c537 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java @@ -0,0 +1,63 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData; +import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; +import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class StreamsInstallAssignmentRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<StreamsInstallAssignmentRequest> { + private final StreamsInstallAssignmentRequestData data; + + public Builder(StreamsInstallAssignmentRequestData data) { + this(data, false); + } + + public Builder(StreamsInstallAssignmentRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, enableUnstableLastVersion); + this.data = data; + } + + @Override + public StreamsInstallAssignmentRequest build(short version) { + return new StreamsInstallAssignmentRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsInstallAssignmentRequestData data; + + public StreamsInstallAssignmentRequest(StreamsInstallAssignmentRequestData data, short version) { + super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new StreamsInstallAssignmentResponse( + new StreamsInstallAssignmentResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + ); + } + + @Override + public StreamsInstallAssignmentRequestData data() { + return data; + } + + public static StreamsInstallAssignmentRequest parse(ByteBuffer buffer, short version) { + return new StreamsInstallAssignmentRequest(new StreamsInstallAssignmentRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java new file mode 100644 index 00000000000..3a09420f796 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java @@ -0,0 +1,62 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; +import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#INVALID_GROUP_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + * - {@link Errors#UNKNOWN_MEMBER_ID} + * - {@link Errors#STALE_MEMBER_EPOCH} + * - {@link Errors#STREAMS_INVALID_ASSIGNMENT} + */ +public class StreamsInstallAssignmentResponse extends AbstractResponse { + + private final StreamsInstallAssignmentResponseData data; + + public StreamsInstallAssignmentResponse(StreamsInstallAssignmentResponseData data) { + super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT); + this.data = data; + } + + @Override + public StreamsInstallAssignmentResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsInstallAssignmentResponse parse(ByteBuffer buffer, short version) { + return new StreamsInstallAssignmentResponse(new StreamsInstallAssignmentResponseData( + new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java new file mode 100644 index 00000000000..3c83273bf4f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java @@ -0,0 +1,63 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData; +import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; +import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class StreamsPrepareAssignmentRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<StreamsPrepareAssignmentRequest> { + private final StreamsPrepareAssignmentRequestData data; + + public Builder(StreamsPrepareAssignmentRequestData data) { + this(data, false); + } + + public Builder(StreamsPrepareAssignmentRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, enableUnstableLastVersion); + this.data = data; + } + + @Override + public StreamsPrepareAssignmentRequest build(short version) { + return new StreamsPrepareAssignmentRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsPrepareAssignmentRequestData data; + + public StreamsPrepareAssignmentRequest(StreamsPrepareAssignmentRequestData data, short version) { + super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new StreamsPrepareAssignmentResponse( + new StreamsPrepareAssignmentResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + ); + } + + @Override + public StreamsPrepareAssignmentRequestData data() { + return data; + } + + public static StreamsPrepareAssignmentRequest parse(ByteBuffer buffer, short version) { + return new StreamsPrepareAssignmentRequest(new StreamsPrepareAssignmentRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java new file mode 100644 index 00000000000..99997022783 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java @@ -0,0 +1,62 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; +import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; +import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#INVALID_GROUP_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + * - {@link Errors#UNKNOWN_MEMBER_ID} + * - {@link Errors#STALE_MEMBER_EPOCH} + */ +public class StreamsPrepareAssignmentResponse extends AbstractResponse { + + private final StreamsPrepareAssignmentResponseData data; + + public StreamsPrepareAssignmentResponse(StreamsPrepareAssignmentResponseData data) { + super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT); + this.data = data; + } + + @Override + public StreamsPrepareAssignmentResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsPrepareAssignmentResponse parse(ByteBuffer buffer, short version) { + return new StreamsPrepareAssignmentResponse(new StreamsPrepareAssignmentResponseData( + new ByteBufferAccessor(buffer), version)); + } + +} diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json index 5b3029a5301..3cb15998420 100644 --- a/clients/src/main/resources/common/message/OffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json @@ -41,7 +41,7 @@ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The unique group identifier." }, { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, - "about": "The generation of the group if using the classic group protocol or the member epoch if using the consumer protocol." }, + "about": "The generation of the group if using the classic group protocol or the member epoch if using the consumer or streams protocol." }, { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true, "about": "The member ID assigned by the group coordinator." }, { "name": "GroupInstanceId", "type": "string", "versions": "7+", diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json index 82ac8065413..f154c8db448 100644 --- a/clients/src/main/resources/common/message/OffsetFetchRequest.json +++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json @@ -54,9 +54,9 @@ { "name": "GroupId", "type": "string", "versions": "8+", "entityType": "groupId", "about": "The group ID."}, { "name": "MemberId", "type": "string", "versions": "9+", "nullableVersions": "9+", "default": "null", "ignorable": true, - "about": "The member id" }, + "about": "The member ID assigned by the group coordinator if using the new consumer or streams protocol (KIP-848 / KIP-XXX)." }, { "name": "MemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true, - "about": "The member epoch if using the new consumer protocol (KIP-848)." }, + "about": "The member epoch if using the new consumer or streams protocol (KIP-848 / KIP-XXX)." }, { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json index e21762e7e10..c97d0b912ff 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json @@ -60,20 +60,21 @@ { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." }, - { "name": "TaskLagUpdateReason", "type": "int8", "versions": "0+", "default": 0, - "about": "Reason for updating task lag. 0 if no update, 1 if task lag is updated due to warm up being ready, 2 if warm up failed" }, - { "name": "TaskLag", "type": "[]TaskLag", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "Cumulative lags for standby tasks. Only updatable when TaskLagUpdateReason is not zero. Null if unchanged since last heartbeat.", + { "name": "TaskOffsetUpdateReason", "type": "int8", "versions": "0+", "default": 0, + "about": "Reason for updating task offset. 0 if no update, 1 if task catches up with acceptable.recovery.lag, 2 if task falls behind acceptable.recovery.lag, 3 if a standby task is added or removed, 4 if triggerd by a time-interval" }, + { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Cumulative offsets for assigned and dormant standby tasks. Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since last heartbeat.", "fields": [ { "name": "TaskId", "type": "TaskId", "versions": "0+", "about": "The task ID" }, - { "name": "Lag", "type": "int64", "versions": "0+", - "about": "The cumulative lag for the task" } + { "name": "Offset", "type": "int64", "versions": "0+", + "about": "The cumulative offset for the task" } ] }, + { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Opaque user data to be passed to the client-side assignor. Null if unchanged since last heartbeat." }, { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "Generic array of assignment configuration strings. Null if unchanged since last heartbeat." - }, + "about": "Generic array of assignment configuration strings. Null if unchanged since last heartbeat." }, { "name": "ShutdownApplication", "type": "bool", "versions": "0+", "default": false, "about": "Whether all Streams clients in the group should shut down." } ], diff --git a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json new file mode 100644 index 00000000000..27d15cc66ec --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json @@ -0,0 +1,39 @@ +{ + "apiKey": 78, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "StreamsInstallAssignmentRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member id assigned by the group coordinator." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "Error", "type": "int8", "versions": "0+", + "about": "The assignment error; or zero if the assignment is successful." }, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", "fields": [ + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member ID." }, + { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", + "about": "Local assignment for this consumer." }, + { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", + "about": "Local standby tasks for this consumer." }, + { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", + "about": "Local warming up tasks for this consumer." } + ]} + ], + "commonStructs": [ + { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "subtopology ID" }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "partitions" } + ]} + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json b/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json new file mode 100644 index 00000000000..b1fe8e00644 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json @@ -0,0 +1,26 @@ +{ + "apiKey": 78, + "type": "response", + "name": "StreamsInstallAssignmentResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - INVALID_GROUP_ID + // - GROUP_ID_NOT_FOUND + // - UNKNOWN_MEMBER_ID + // - STALE_MEMBER_EPOCH + // - STREAMS_INVALID_ASSIGNMENT + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." } + ] +} diff --git a/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json b/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json new file mode 100644 index 00000000000..12b9098940c --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json @@ -0,0 +1,16 @@ +{ + "apiKey": 79, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "StreamsPrepareAssignmentRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member id assigned by the group coordinator." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json new file mode 100644 index 00000000000..c917931a568 --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json @@ -0,0 +1,96 @@ +{ + "apiKey": 79, + "type": "response", + "name": "StreamsPrepareAssignmentResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - INVALID_GROUP_ID + // - GROUP_ID_NOT_FOUND + // - UNKNOWN_MEMBER_ID + // - STALE_MEMBER_EPOCH + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "AssignorName", "type": "string", "versions": "0+", + "about": "The selected assignor." }, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", "fields": [ + { "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member ID." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The member epoch." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member instance ID." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The rack ID." }, + + { "name": "TopologyHash", "type": "bytes", "versions": "0+", + "about": "The hash of the topology." }, + + { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", + "about": "Target active tasks for this consumer." }, + { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", + "about": "Target standby tasks for this consumer." }, + { "name": "WarmupTasks", "type": "[]TaskId", "versions": "0+", + "about": "Target warming up tasks for this consumer. " }, + + { "name": "ProcessId", "type": "uuid", "versions": "0+", + "about": "Identity of the streams instance that may have multiple consumers. " }, + { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", + "about": "Used for rack-aware assignment algorithm." }, + { "name": "TaskOffsetUpdateReason", "type": "int8", "versions": "0+", "default": 0, + "about": "Reason for updating task offset. 0 if no update, 1 if task catches up with acceptable.recovery.lag, 2 if task falls behind acceptable.recovery.lag, 3 if a standby task is added or removed, 4 if triggerd by a time-interval" }, + { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Cumulative offsets for assigned and dormant standby tasks. Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since last heartbeat.", + "fields": [ + { "name": "TaskId", "type": "TaskId", "versions": "0+", + "about": "The task ID" }, + { "name": "Offset", "type": "int64", "versions": "0+", + "about": "The cumulative offset for the task" } + ] + }, + { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Opaque user data to be passed to the client-side assignor. Null if unchanged since last heartbeat." }, + { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Generic array of assignment configuration strings."} + ]}, + { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+", + "about": "The topic-partition metadata.", + "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The topic ID." }, + { "name": "TopicName", "type": "string", "versions": "0+", + "about": "The topic name." }, + { "name": "NumPartitions", "type": "int32", "versions": "0+", + "about": "The number of partitions." } + ]} + ], + "commonStructs": [ + { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "subtopology ID" }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "partitions" } + ]}, + { "name": "KeyValue", "versions": "0+", + "fields": [ + { "name": "Key", "type": "string", "versions": "0+", + "about": "key of the config" }, + { "name": "Value", "type": "string", "versions": "0+", + "about": "value of the config" } + ] + } + ] +} \ No newline at end of file
