This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 6c36a43fe9d6f9bc69a279ba418f094900e3321a Author: Bruno Cadonna <[email protected]> AuthorDate: Tue Jul 16 16:02:52 2024 +0200 Resolve conflicts for 11/25 trunk rebase - Rebased on AK trunk 2024-07-16 --- .../org/apache/kafka/common/protocol/ApiKeys.java | 2 - .../requests/StreamsInstallAssignmentRequest.java | 77 ----------------- .../requests/StreamsInstallAssignmentResponse.java | 75 ----------------- .../requests/StreamsPrepareAssignmentRequest.java | 77 ----------------- .../requests/StreamsPrepareAssignmentResponse.java | 75 ----------------- .../message/StreamsGroupDescribeRequest.json | 2 +- .../message/StreamsGroupDescribeResponse.json | 2 +- .../common/message/StreamsHeartbeatRequest.json | 2 +- .../common/message/StreamsHeartbeatResponse.json | 2 +- .../common/message/StreamsInitializeRequest.json | 2 +- .../common/message/StreamsInitializeResponse.json | 2 +- .../message/StreamsInstallAssignmentRequest.json | 39 --------- .../message/StreamsInstallAssignmentResponse.json | 26 ------ .../message/StreamsPrepareAssignmentRequest.json | 16 ---- .../message/StreamsPrepareAssignmentResponse.json | 98 ---------------------- .../kafka/common/requests/RequestResponseTest.java | 35 ++++---- .../group/GroupCoordinatorRecordHelpers.java | 73 ++++------------ .../coordinator/group/GroupCoordinatorService.java | 4 +- .../coordinator/group/GroupCoordinatorShard.java | 28 ++++--- .../coordinator/group/GroupMetadataManager.java | 64 ++++++++++++++ .../coordinator/group/streams/StreamsGroup.java | 3 +- .../StreamsGroupCurrentMemberAssignmentKey.json | 6 +- .../message/StreamsGroupMemberMetadataKey.json | 6 +- .../common/message/StreamsGroupMetadataKey.json | 4 +- .../message/StreamsGroupPartitionMetadataKey.json | 4 +- .../StreamsGroupTargetAssignmentMemberKey.json | 6 +- .../StreamsGroupTargetAssignmentMetadataKey.json | 4 +- .../common/message/StreamsGroupTopologyKey.json | 4 +- .../group/GroupCoordinatorRecordHelpersTest.java | 2 +- .../group/streams/StreamsGroupTest.java | 17 ++-- .../SmokeTestDriverIntegrationTest.java | 3 +- 31 files changed, 151 insertions(+), 609 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 34ed496e1ec..35548b52ebf 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -133,8 +133,6 @@ public enum ApiKeys { READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), STREAMS_HEARTBEAT(ApiMessageType.STREAMS_HEARTBEAT), STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE), - STREAMS_INSTALL_ASSIGNMENT(ApiMessageType.STREAMS_INSTALL_ASSIGNMENT), - STREAMS_PREPARE_ASSIGNMENT(ApiMessageType.STREAMS_PREPARE_ASSIGNMENT), STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java deleted file mode 100644 index 7063ec61676..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData; -import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ByteBufferAccessor; -import org.apache.kafka.common.protocol.Errors; - -import java.nio.ByteBuffer; - -public class StreamsInstallAssignmentRequest extends AbstractRequest { - - public static class Builder extends AbstractRequest.Builder<StreamsInstallAssignmentRequest> { - private final StreamsInstallAssignmentRequestData data; - - public Builder(StreamsInstallAssignmentRequestData data) { - this(data, false); - } - - public Builder(StreamsInstallAssignmentRequestData data, boolean enableUnstableLastVersion) { - super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, enableUnstableLastVersion); - this.data = data; - } - - @Override - public StreamsInstallAssignmentRequest build(short version) { - return new StreamsInstallAssignmentRequest(data, version); - } - - @Override - public String toString() { - return data.toString(); - } - } - - private final StreamsInstallAssignmentRequestData data; - - public StreamsInstallAssignmentRequest(StreamsInstallAssignmentRequestData data, short version) { - super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT, version); - this.data = data; - } - - @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new StreamsInstallAssignmentResponse( - new StreamsInstallAssignmentResponseData() - .setThrottleTimeMs(throttleTimeMs) - .setErrorCode(Errors.forException(e).code()) - ); - } - - @Override - public StreamsInstallAssignmentRequestData data() { - return data; - } - - public static StreamsInstallAssignmentRequest parse(ByteBuffer buffer, short version) { - return new StreamsInstallAssignmentRequest(new StreamsInstallAssignmentRequestData( - new ByteBufferAccessor(buffer), version), version); - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java deleted file mode 100644 index cc389760577..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ByteBufferAccessor; -import org.apache.kafka.common.protocol.Errors; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Map; - -/** - * Possible error codes. - * - * - {@link Errors#GROUP_AUTHORIZATION_FAILED} - * - {@link Errors#NOT_COORDINATOR} - * - {@link Errors#COORDINATOR_NOT_AVAILABLE} - * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} - * - {@link Errors#INVALID_REQUEST} - * - {@link Errors#INVALID_GROUP_ID} - * - {@link Errors#GROUP_ID_NOT_FOUND} - * - {@link Errors#UNKNOWN_MEMBER_ID} - * - {@link Errors#STALE_MEMBER_EPOCH} - * - {@link Errors#STREAMS_INVALID_ASSIGNMENT} - */ -public class StreamsInstallAssignmentResponse extends AbstractResponse { - - private final StreamsInstallAssignmentResponseData data; - - public StreamsInstallAssignmentResponse(StreamsInstallAssignmentResponseData data) { - super(ApiKeys.STREAMS_INSTALL_ASSIGNMENT); - this.data = data; - } - - @Override - public StreamsInstallAssignmentResponseData data() { - return data; - } - - @Override - public Map<Errors, Integer> errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); - } - - @Override - public int throttleTimeMs() { - return data.throttleTimeMs(); - } - - @Override - public void maybeSetThrottleTimeMs(int throttleTimeMs) { - data.setThrottleTimeMs(throttleTimeMs); - } - - public static StreamsInstallAssignmentResponse parse(ByteBuffer buffer, short version) { - return new StreamsInstallAssignmentResponse(new StreamsInstallAssignmentResponseData( - new ByteBufferAccessor(buffer), version)); - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java deleted file mode 100644 index adeef79393b..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData; -import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ByteBufferAccessor; -import org.apache.kafka.common.protocol.Errors; - -import java.nio.ByteBuffer; - -public class StreamsPrepareAssignmentRequest extends AbstractRequest { - - public static class Builder extends AbstractRequest.Builder<StreamsPrepareAssignmentRequest> { - private final StreamsPrepareAssignmentRequestData data; - - public Builder(StreamsPrepareAssignmentRequestData data) { - this(data, false); - } - - public Builder(StreamsPrepareAssignmentRequestData data, boolean enableUnstableLastVersion) { - super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, enableUnstableLastVersion); - this.data = data; - } - - @Override - public StreamsPrepareAssignmentRequest build(short version) { - return new StreamsPrepareAssignmentRequest(data, version); - } - - @Override - public String toString() { - return data.toString(); - } - } - - private final StreamsPrepareAssignmentRequestData data; - - public StreamsPrepareAssignmentRequest(StreamsPrepareAssignmentRequestData data, short version) { - super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT, version); - this.data = data; - } - - @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new StreamsPrepareAssignmentResponse( - new StreamsPrepareAssignmentResponseData() - .setThrottleTimeMs(throttleTimeMs) - .setErrorCode(Errors.forException(e).code()) - ); - } - - @Override - public StreamsPrepareAssignmentRequestData data() { - return data; - } - - public static StreamsPrepareAssignmentRequest parse(ByteBuffer buffer, short version) { - return new StreamsPrepareAssignmentRequest(new StreamsPrepareAssignmentRequestData( - new ByteBufferAccessor(buffer), version), version); - } -} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java deleted file mode 100644 index f0c478ec81b..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ByteBufferAccessor; -import org.apache.kafka.common.protocol.Errors; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Map; - -/** - * Possible error codes. - * - * - {@link Errors#GROUP_AUTHORIZATION_FAILED} - * - {@link Errors#NOT_COORDINATOR} - * - {@link Errors#COORDINATOR_NOT_AVAILABLE} - * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} - * - {@link Errors#INVALID_REQUEST} - * - {@link Errors#INVALID_GROUP_ID} - * - {@link Errors#GROUP_ID_NOT_FOUND} - * - {@link Errors#UNKNOWN_MEMBER_ID} - * - {@link Errors#STALE_MEMBER_EPOCH} - */ -public class StreamsPrepareAssignmentResponse extends AbstractResponse { - - private final StreamsPrepareAssignmentResponseData data; - - public StreamsPrepareAssignmentResponse(StreamsPrepareAssignmentResponseData data) { - super(ApiKeys.STREAMS_PREPARE_ASSIGNMENT); - this.data = data; - } - - @Override - public StreamsPrepareAssignmentResponseData data() { - return data; - } - - @Override - public Map<Errors, Integer> errorCounts() { - return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); - } - - @Override - public int throttleTimeMs() { - return data.throttleTimeMs(); - } - - @Override - public void maybeSetThrottleTimeMs(int throttleTimeMs) { - data.setThrottleTimeMs(throttleTimeMs); - } - - public static StreamsPrepareAssignmentResponse parse(ByteBuffer buffer, short version) { - return new StreamsPrepareAssignmentResponse(new StreamsPrepareAssignmentResponseData( - new ByteBufferAccessor(buffer), version)); - } - -} diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json index 426e9dd81ed..2c828455a67 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 80, + "apiKey": 90, "type": "request", "listeners": ["zkBroker", "broker"], "name": "StreamsGroupDescribeRequest", diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index 3080631d66d..ae4b3f8ea35 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 80, + "apiKey": 90, "type": "response", "name": "StreamsGroupDescribeResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json index 2ac6ce2e6e9..4121133c73c 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 76, + "apiKey": 89, "type": "request", "listeners": ["zkBroker", "broker"], "name": "StreamsHeartbeatRequest", diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json index 2a3e7f1ea80..9cbbdee2a58 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 76, + "apiKey": 89, "type": "response", "name": "StreamsHeartbeatResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/StreamsInitializeRequest.json b/clients/src/main/resources/common/message/StreamsInitializeRequest.json index 5acc772321c..b93fd3d7a7f 100644 --- a/clients/src/main/resources/common/message/StreamsInitializeRequest.json +++ b/clients/src/main/resources/common/message/StreamsInitializeRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 77, + "apiKey": 88, "type": "request", "listeners": ["zkBroker", "broker"], "name": "StreamsInitializeRequest", diff --git a/clients/src/main/resources/common/message/StreamsInitializeResponse.json b/clients/src/main/resources/common/message/StreamsInitializeResponse.json index ff73577303e..2b4be5f778d 100644 --- a/clients/src/main/resources/common/message/StreamsInitializeResponse.json +++ b/clients/src/main/resources/common/message/StreamsInitializeResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 77, + "apiKey": 88, "type": "response", "name": "StreamsInitializeResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json b/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json deleted file mode 100644 index 847bb9bb215..00000000000 --- a/clients/src/main/resources/common/message/StreamsInstallAssignmentRequest.json +++ /dev/null @@ -1,39 +0,0 @@ -{ - "apiKey": 78, - "type": "request", - "listeners": ["zkBroker", "broker"], - "name": "StreamsInstallAssignmentRequest", - "validVersions": "0", - "flexibleVersions": "0+", - "fields": [ - { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", - "about": "The group identifier." }, - { "name": "MemberId", "type": "string", "versions": "0+", - "about": "The member id assigned by the group coordinator." }, - { "name": "MemberEpoch", "type": "int32", "versions": "0+", - "about": "The member epoch." }, - { "name": "GroupEpoch", "type": "int32", "versions": "0+", - "about": "The group epoch." }, - { "name": "Error", "type": "int8", "versions": "0+", - "about": "The assignment error; or zero if the assignment is successful." }, - { "name": "Members", "type": "[]Member", "versions": "0+", - "about": "The members.", "fields": [ - { "name": "MemberId", "type": "string", "versions": "0+", - "about": "The member ID." }, - { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", - "about": "Local assignment for this consumer." }, - { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", - "about": "Local standby tasks for this consumer." }, - { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", - "about": "Local warming up tasks for this consumer." } - ]} - ], - "commonStructs": [ - { "name": "TaskIds", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "subtopology ID" }, - { "name": "Partitions", "type": "[]int32", "versions": "0+", - "about": "partitions" } - ]} - ] -} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json b/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json deleted file mode 100644 index b1fe8e00644..00000000000 --- a/clients/src/main/resources/common/message/StreamsInstallAssignmentResponse.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "apiKey": 78, - "type": "response", - "name": "StreamsInstallAssignmentResponse", - "validVersions": "0", - "flexibleVersions": "0+", - // Supported errors: - // - GROUP_AUTHORIZATION_FAILED - // - NOT_COORDINATOR - // - COORDINATOR_NOT_AVAILABLE - // - COORDINATOR_LOAD_IN_PROGRESS - // - INVALID_REQUEST - // - INVALID_GROUP_ID - // - GROUP_ID_NOT_FOUND - // - UNKNOWN_MEMBER_ID - // - STALE_MEMBER_EPOCH - // - STREAMS_INVALID_ASSIGNMENT - "fields": [ - { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", - "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The top-level error code, or 0 if there was no error" }, - { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The top-level error message, or null if there was no error." } - ] -} diff --git a/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json b/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json deleted file mode 100644 index 12b9098940c..00000000000 --- a/clients/src/main/resources/common/message/StreamsPrepareAssignmentRequest.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "apiKey": 79, - "type": "request", - "listeners": ["zkBroker", "broker"], - "name": "StreamsPrepareAssignmentRequest", - "validVersions": "0", - "flexibleVersions": "0+", - "fields": [ - { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", - "about": "The group identifier." }, - { "name": "MemberId", "type": "string", "versions": "0+", - "about": "The member id assigned by the group coordinator." }, - { "name": "MemberEpoch", "type": "int32", "versions": "0+", - "about": "The member epoch." } - ] -} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json b/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json deleted file mode 100644 index 1d9622caa9e..00000000000 --- a/clients/src/main/resources/common/message/StreamsPrepareAssignmentResponse.json +++ /dev/null @@ -1,98 +0,0 @@ -{ - "apiKey": 79, - "type": "response", - "name": "StreamsPrepareAssignmentResponse", - "validVersions": "0", - "flexibleVersions": "0+", - // Supported errors: - // - GROUP_AUTHORIZATION_FAILED - // - NOT_COORDINATOR - // - COORDINATOR_NOT_AVAILABLE - // - COORDINATOR_LOAD_IN_PROGRESS - // - INVALID_REQUEST - // - INVALID_GROUP_ID - // - GROUP_ID_NOT_FOUND - // - UNKNOWN_MEMBER_ID - // - STALE_MEMBER_EPOCH - "fields": [ - { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", - "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The top-level error code, or 0 if there was no error" }, - { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The top-level error message, or null if there was no error." }, - { "name": "GroupEpoch", "type": "int32", "versions": "0+", - "about": "The group epoch." }, - { "name": "AssignorName", "type": "string", "versions": "0+", - "about": "The selected assignor." }, - { "name": "Members", "type": "[]Member", "versions": "0+", - "about": "The members.", "fields": [ - { "name": "MemberId", "type": "string", "versions": "0+", - "about": "The member ID." }, - { "name": "MemberEpoch", "type": "int32", "versions": "0+", - "about": "The member epoch." }, - { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The member instance ID." }, - { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "The rack ID." }, - - { "name": "TopologyHash", "type": "bytes", "versions": "0+", - "about": "The hash of the topology." }, - - { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", - "about": "Target active tasks for this consumer." }, - { "name": "StandbyTasks", "type": "[]TaskIds", "versions": "0+", - "about": "Target standby tasks for this consumer." }, - { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", - "about": "Target warming up tasks for this consumer. " }, - - { "name": "ProcessId", "type": "uuid", "versions": "0+", - "about": "Identity of the streams instance that may have multiple consumers. " }, - { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", - "about": "Used for rack-aware assignment algorithm." }, - { "name": "TaskOffsetUpdateReason", "type": "int8", "versions": "0+", "default": 0, - "about": "Reason for updating task offset. 0 if no update, 1 if task catches up with acceptable.recovery.lag, 2 if task falls behind acceptable.recovery.lag, 3 if a standby task is added or removed, 4 if triggerd by a time-interval" }, - { "name": "TaskOffset", "type": "[]TaskOffset", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "Cumulative offsets for assigned and dormant standby tasks. Only updatable when TaskOffsetUpdateReason is not zero. Null if unchanged since last heartbeat.", - "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "A unique identifier of the subtopology." }, - { "name": "Partition", "type": "int32", "versions": "0+", - "about": "The partition of the input topics." }, - { "name": "Offset", "type": "int64", "versions": "0+", - "about": "The cumulative offset for the task" } - ] - }, - { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "Opaque user data to be passed to the client-side assignor. Null if unchanged since last heartbeat." }, - { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "Generic array of assignment configuration strings."} - ]}, - { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+", - "about": "The topic-partition metadata.", - "fields": [ - { "name": "TopicId", "type": "uuid", "versions": "0+", - "about": "The topic ID." }, - { "name": "TopicName", "type": "string", "versions": "0+", - "about": "The topic name." }, - { "name": "NumPartitions", "type": "int32", "versions": "0+", - "about": "The number of partitions." } - ]} - ], - "commonStructs": [ - { "name": "TaskIds", "versions": "0+", "fields": [ - { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "subtopology ID" }, - { "name": "Partitions", "type": "[]int32", "versions": "0+", - "about": "partitions" } - ]}, - { "name": "KeyValue", "versions": "0+", - "fields": [ - { "name": "Key", "type": "string", "versions": "0+", - "about": "key of the config" }, - { "name": "Value", "type": "string", "versions": "0+", - "about": "value of the config" } - ] - } - ] -} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index da4ae96af47..1772de05e85 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -238,10 +238,6 @@ import org.apache.kafka.common.message.StreamsHeartbeatRequestData; import org.apache.kafka.common.message.StreamsHeartbeatResponseData; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; -import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData; -import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; -import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData; -import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -4027,13 +4023,13 @@ public class RequestResponseTest { return new ReadShareGroupStateSummaryResponse(data); } - private AbstractRequest createStreamsPrepareAssignmentRequest(final short version) { - return new StreamsPrepareAssignmentRequest.Builder(new StreamsPrepareAssignmentRequestData()).build(version); - } - - private AbstractRequest createStreamsInstallAssignmentRequest(final short version) { - return new StreamsInstallAssignmentRequest.Builder(new StreamsInstallAssignmentRequestData()).build(version); - } +// private AbstractRequest createStreamsPrepareAssignmentRequest(final short version) { +// return new StreamsPrepareAssignmentRequest.Builder(new StreamsPrepareAssignmentRequestData()).build(version); +// } +// +// private AbstractRequest createStreamsInstallAssignmentRequest(final short version) { +// return new StreamsInstallAssignmentRequest.Builder(new StreamsInstallAssignmentRequestData()).build(version); +// } private AbstractRequest createStreamsInitializeRequest(final short version) { return new StreamsInitializeRequest.Builder(new StreamsInitializeRequestData()).build(version); @@ -4043,20 +4039,21 @@ public class RequestResponseTest { return new StreamsHeartbeatRequest.Builder(new StreamsHeartbeatRequestData()).build(version); } - private AbstractResponse createStreamsPrepareAssignmentResponse() { - return new StreamsPrepareAssignmentResponse(new StreamsPrepareAssignmentResponseData()); - } - - private AbstractResponse createStreamsInstallAssignmentResponse() { - return new StreamsInstallAssignmentResponse(new StreamsInstallAssignmentResponseData()); - } - +// private AbstractResponse createStreamsPrepareAssignmentResponse() { +// return new StreamsPrepareAssignmentResponse(new StreamsPrepareAssignmentResponseData()); +// } +// +// private AbstractResponse createStreamsInstallAssignmentResponse() { +// return new StreamsInstallAssignmentResponse(new StreamsInstallAssignmentResponseData()); +// } +// private AbstractResponse createStreamsInitializeResponse() { return new StreamsInitializeResponse(new StreamsInitializeResponseData()); } private AbstractResponse createStreamsHeartbeatResponse() { return new StreamsHeartbeatResponse(new StreamsHeartbeatResponseData()); + } @Test public void testInvalidSaslHandShakeRequest() { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 842df9d7cbb..78c17bb0863 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -155,7 +155,7 @@ public class GroupCoordinatorRecordHelpers { new StreamsGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 11 + (short) 14 ), new ApiMessageAndVersion( new StreamsGroupMemberMetadataValue() @@ -200,7 +200,7 @@ public class GroupCoordinatorRecordHelpers { new StreamsGroupMemberMetadataKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 11 + (short) 14 ), null // Tombstone. ); @@ -293,7 +293,7 @@ public class GroupCoordinatorRecordHelpers { new ApiMessageAndVersion( new StreamsGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 10 + (short) 13 ), new ApiMessageAndVersion( value, @@ -315,7 +315,7 @@ public class GroupCoordinatorRecordHelpers { new ApiMessageAndVersion( new StreamsGroupPartitionMetadataKey() .setGroupId(groupId), - (short) 10 + (short) 13 ), null // Tombstone. ); @@ -346,25 +346,6 @@ public class GroupCoordinatorRecordHelpers { ); } - /** - * Creates a ConsumerGroupMetadata tombstone. - * - * @param groupId The consumer group id. - * @return The record. - */ - public static CoordinatorRecord newGroupEpochTombstoneRecord( - String groupId - ) { - return new CoordinatorRecord( - new ApiMessageAndVersion( - new ConsumerGroupMetadataKey() - .setGroupId(groupId), - (short) 3 - ), - null // Tombstone. - ); - } - public static CoordinatorRecord newStreamsGroupEpochRecord( String groupId, int newGroupEpoch @@ -373,7 +354,7 @@ public class GroupCoordinatorRecordHelpers { new ApiMessageAndVersion( new StreamsGroupMetadataKey() .setGroupId(groupId), - (short) 9 + (short) 12 ), new ApiMessageAndVersion( new StreamsGroupMetadataValue() @@ -396,16 +377,16 @@ public class GroupCoordinatorRecordHelpers { new ApiMessageAndVersion( new StreamsGroupMetadataKey() .setGroupId(groupId), - (short) 9 + (short) 12 ), null // Tombstone. ); } /** - * Creates a StreamsGroupMetadata tombstone. + * Creates a ConsumerGroupMetadata tombstone. * - * @param groupId The streams group id. + * @param groupId The consumer group id. * @return The record. */ public static CoordinatorRecord newConsumerGroupEpochTombstoneRecord( @@ -519,7 +500,7 @@ public class GroupCoordinatorRecordHelpers { new StreamsGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 13 + (short) 16 ), new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMemberValue() @@ -547,7 +528,7 @@ public class GroupCoordinatorRecordHelpers { new StreamsGroupTargetAssignmentMemberKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 13 + (short) 16 ), null // Tombstone. ); @@ -606,7 +587,7 @@ public class GroupCoordinatorRecordHelpers { new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 12 + (short) 15 ), new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMetadataValue() @@ -629,7 +610,7 @@ public class GroupCoordinatorRecordHelpers { new ApiMessageAndVersion( new StreamsGroupTargetAssignmentMetadataKey() .setGroupId(groupId), - (short) 12 + (short) 15 ), null // Tombstone. ); @@ -665,28 +646,6 @@ public class GroupCoordinatorRecordHelpers { ); } - /** - * Creates a ConsumerGroupCurrentMemberAssignment tombstone. - * - * @param groupId The consumer group id. - * @param memberId The consumer group member id. - * @return The record. - */ - public static CoordinatorRecord newCurrentAssignmentTombstoneRecord( - String groupId, - String memberId - ) { - return new CoordinatorRecord( - new ApiMessageAndVersion( - new ConsumerGroupCurrentMemberAssignmentKey() - .setGroupId(groupId) - .setMemberId(memberId), - (short) 8 - ), - null // Tombstone - ); - } - public static CoordinatorRecord newStreamsCurrentAssignmentRecord( String groupId, StreamsGroupMember member @@ -696,7 +655,7 @@ public class GroupCoordinatorRecordHelpers { new StreamsGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(member.memberId()), - (short) 14 + (short) 17 ), new ApiMessageAndVersion( new StreamsGroupCurrentMemberAssignmentValue() @@ -783,7 +742,7 @@ public class GroupCoordinatorRecordHelpers { new StreamsGroupCurrentMemberAssignmentKey() .setGroupId(groupId) .setMemberId(memberId), - (short) 14 + (short) 17 ), null // Tombstone ); @@ -1356,7 +1315,7 @@ public class GroupCoordinatorRecordHelpers { .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); }); - return new CoordinatorRecord(new ApiMessageAndVersion(new StreamsGroupTopologyKey().setGroupId(groupId), (short) 15), + return new CoordinatorRecord(new ApiMessageAndVersion(new StreamsGroupTopologyKey().setGroupId(groupId), (short) 18), new ApiMessageAndVersion(value, (short) 0)); } @@ -1373,7 +1332,7 @@ public class GroupCoordinatorRecordHelpers { new ApiMessageAndVersion( new StreamsGroupTopologyKey() .setGroupId(groupId), - (short) 15 + (short) 18 ), null // Tombstone ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 9f180c5798a..20042ebfb06 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -365,7 +365,7 @@ public class GroupCoordinatorService implements GroupCoordinator { return runtime.scheduleWriteOperation( "streams-group-initialize", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.streamsInitialize(context, request) ).exceptionally(exception -> handleOperationException( "streams-group-initialize", @@ -394,7 +394,7 @@ public class GroupCoordinatorService implements GroupCoordinator { return runtime.scheduleWriteOperation( "streams-heartbeat", topicPartitionFor(request.groupId()), - Duration.ofMillis(config.offsetCommitTimeoutMs), + Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.streamsHeartbeat(context, request) ).exceptionally(exception -> handleOperationException( "streams-heartbeat", diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 2d8e5229230..aa0a18d0bff 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -122,6 +122,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.coordinator.group.Utils.messageOrNull; + /** * The group coordinator shard is a replicated state machine that manages the metadata of all * classic and consumer groups. It holds the hard and the soft state of the groups. This class @@ -254,9 +256,9 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord .withShareGroupHeartbeatInterval(config.shareGroupHeartbeatIntervalMs()) // TODO: Do we need separate configs for streams groups? .withStreamsGroupAssignors(Collections.singletonList(new MockAssignor())) - .withStreamsGroupMaxSize(config.consumerGroupMaxSize) - .withStreamsGroupSessionTimeout(config.consumerGroupSessionTimeoutMs) - .withStreamsGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs) + .withStreamsGroupMaxSize(config.consumerGroupMaxSize()) + .withStreamsGroupSessionTimeout(config.consumerGroupSessionTimeoutMs()) + .withStreamsGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs()) .withGroupCoordinatorMetricsShard(metricsShard) .build(); @@ -851,56 +853,56 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord offset, producerId, (OffsetCommitKey) key.message(), - (OffsetCommitValue) Utils.messageOrNull(value) + (OffsetCommitValue) messageOrNull(value) ); break; case 2: groupMetadataManager.replay( (GroupMetadataKey) key.message(), - (GroupMetadataValue) Utils.messageOrNull(value) + (GroupMetadataValue) messageOrNull(value) ); break; case 3: groupMetadataManager.replay( (ConsumerGroupMetadataKey) key.message(), - (ConsumerGroupMetadataValue) Utils.messageOrNull(value) + (ConsumerGroupMetadataValue) messageOrNull(value) ); break; case 4: groupMetadataManager.replay( (ConsumerGroupPartitionMetadataKey) key.message(), - (ConsumerGroupPartitionMetadataValue) Utils.messageOrNull(value) + (ConsumerGroupPartitionMetadataValue) messageOrNull(value) ); break; case 5: groupMetadataManager.replay( (ConsumerGroupMemberMetadataKey) key.message(), - (ConsumerGroupMemberMetadataValue) Utils.messageOrNull(value) + (ConsumerGroupMemberMetadataValue) messageOrNull(value) ); break; case 6: groupMetadataManager.replay( (ConsumerGroupTargetAssignmentMetadataKey) key.message(), - (ConsumerGroupTargetAssignmentMetadataValue) Utils.messageOrNull(value) + (ConsumerGroupTargetAssignmentMetadataValue) messageOrNull(value) ); break; case 7: groupMetadataManager.replay( (ConsumerGroupTargetAssignmentMemberKey) key.message(), - (ConsumerGroupTargetAssignmentMemberValue) Utils.messageOrNull(value) + (ConsumerGroupTargetAssignmentMemberValue) messageOrNull(value) ); break; case 8: groupMetadataManager.replay( (ConsumerGroupCurrentMemberAssignmentKey) key.message(), - (ConsumerGroupCurrentMemberAssignmentValue) Utils.messageOrNull(value) + (ConsumerGroupCurrentMemberAssignmentValue) messageOrNull(value) ); break; @@ -914,14 +916,14 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord case 10: groupMetadataManager.replay( (ShareGroupMemberMetadataKey) key.message(), - (ShareGroupMemberMetadataValue) Utils.messageOrNull(value) + (ShareGroupMemberMetadataValue) messageOrNull(value) ); break; case 11: groupMetadataManager.replay( (ShareGroupMetadataKey) key.message(), - (ShareGroupMetadataValue) Utils.messageOrNull(value) + (ShareGroupMetadataValue) messageOrNull(value) ); break; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 5997591b812..09dd21913e6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -5238,6 +5238,69 @@ public class GroupMetadataManager { } } + /** + * Replays StreamsGroupMetadataKey/Value to update the hard state of + * the Streams group. It updates the group epoch of the Streams + * group or deletes the Streams group. + * + * @param key A StreamsGroupMetadataKey key. + * @param value A StreamsGroupMetadataValue record. + */ + public void replay( + StreamsGroupMetadataKey key, + StreamsGroupMetadataValue value + ) { + String groupId = key.groupId(); + + if (value != null) { + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, true); + streamsGroup.setGroupEpoch(value.epoch()); + } else { + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false); + if (!streamsGroup.members().isEmpty()) { + throw new IllegalStateException("Received a tombstone record to delete group " + groupId + + " but the group still has " + streamsGroup.members().size() + " members."); + } + if (!streamsGroup.targetAssignment().isEmpty()) { + throw new IllegalStateException("Received a tombstone record to delete group " + groupId + + " but the target assignment still has " + streamsGroup.targetAssignment().size() + + " members."); + } + if (streamsGroup.assignmentEpoch() != -1) { + throw new IllegalStateException("Received a tombstone record to delete group " + groupId + + " but did not receive StreamsGroupTargetAssignmentMetadataValue tombstone."); + } + removeGroup(groupId); + } + + } + + /** + * Replays StreamsGroupPartitionMetadataKey/Value to update the hard state of + * the streams group. It updates the subscription metadata of the streams + * group. + * + * @param key A StreamsGroupPartitionMetadataKey key. + * @param value A StreamsGroupPartitionMetadataValue record. + */ + public void replay( + StreamsGroupPartitionMetadataKey key, + StreamsGroupPartitionMetadataValue value + ) { + String groupId = key.groupId(); + StreamsGroup streamsGroup = getOrMaybeCreatePersistedStreamsGroup(groupId, false); + + if (value != null) { + Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata = new HashMap<>(); + value.topics().forEach(topicMetadata -> { + subscriptionMetadata.put(topicMetadata.topicName(), org.apache.kafka.coordinator.group.streams.TopicMetadata.fromRecord(topicMetadata)); + }); + streamsGroup.setSubscriptionMetadata(subscriptionMetadata); + } else { + streamsGroup.setSubscriptionMetadata(Collections.emptyMap()); + } + } + /** * Replays ShareGroupMemberMetadataKey/Value to update the hard state of * the share group. It updates the subscription part of the member or @@ -5299,6 +5362,7 @@ public class GroupMetadataManager { if (!shareGroup.members().isEmpty()) { throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + shareGroup.members().size() + " members."); + } if (!shareGroup.targetAssignment().isEmpty()) { throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the target assignment still has " + shareGroup.targetAssignment().size() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index f83305e3e1a..7d3465c7ad2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -811,7 +811,8 @@ public class StreamsGroup implements Group { String memberId, String groupInstanceId, int memberEpoch, - boolean isTransactional + boolean isTransactional, + short apiVersion ) throws UnknownMemberIdException, StaleMemberEpochException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json index 30f775fa1f8..771f7324a9e 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json @@ -17,12 +17,12 @@ { "type": "data", "name": "StreamsGroupCurrentMemberAssignmentKey", - "validVersions": "14", + "validVersions": "17", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "14", + { "name": "GroupId", "type": "string", "versions": "17", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "14", + { "name": "MemberId", "type": "string", "versions": "17", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json index 0834b3e41ee..bac9ac247cf 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json @@ -17,12 +17,12 @@ { "type": "data", "name": "StreamsGroupMemberMetadataKey", - "validVersions": "11", + "validVersions": "14", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "11", + { "name": "GroupId", "type": "string", "versions": "14", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "11", + { "name": "MemberId", "type": "string", "versions": "14", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json index 8f287c05652..26121bf2ba2 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupMetadataKey", - "validVersions": "9", + "validVersions": "12", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "9", + { "name": "GroupId", "type": "string", "versions": "12", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json index 954e5fd1182..546a8f80535 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupPartitionMetadataKey", - "validVersions": "10", + "validVersions": "13", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "10", + { "name": "GroupId", "type": "string", "versions": "13", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json index 128bf44bba7..4fc8231ec3d 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json @@ -17,12 +17,12 @@ { "type": "data", "name": "StreamsGroupTargetAssignmentMemberKey", - "validVersions": "13", + "validVersions": "16", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "13", + { "name": "GroupId", "type": "string", "versions": "16", "about": "The group id." }, - { "name": "MemberId", "type": "string", "versions": "13", + { "name": "MemberId", "type": "string", "versions": "16", "about": "The member id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json index b583b3224b4..02b40f727c4 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupTargetAssignmentMetadataKey", - "validVersions": "12", + "validVersions": "15", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "12", + { "name": "GroupId", "type": "string", "versions": "15", "about": "The group id." } ] } diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json index 11873d4403c..261b755cd51 100644 --- a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json @@ -17,10 +17,10 @@ { "type": "data", "name": "StreamsGroupTopologyKey", - "validVersions": "15", + "validVersions": "18", "flexibleVersions": "none", "fields": [ - { "name": "GroupId", "type": "string", "versions": "15", + { "name": "GroupId", "type": "string", "versions": "18", "about": "The group id." } ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index 480c1576440..d6258e13db0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -284,7 +284,7 @@ public class GroupCoordinatorRecordHelpersTest { new ApiMessageAndVersion( new StreamsGroupTopologyKey() .setGroupId("group-id"), - (short) 15), + (short) 18), new ApiMessageAndVersion( new StreamsGroupTopologyValue() .setTopology(expectedTopology), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index 4dd727b8050..68fddc0654f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -20,8 +20,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.coordinator.group.OffsetAndMetadata; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; @@ -605,31 +607,32 @@ public class StreamsGroupTest { } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testValidateOffsetCommit(boolean isTransactional) { + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateOffsetCommit(short version) { + boolean isTransactional = false; StreamsGroup group = createStreamsGroup("group-foo"); // Simulate a call from the admin client without member id and member epoch. // This should pass only if the group is empty. - group.validateOffsetCommit("", "", -1, isTransactional); + group.validateOffsetCommit("", "", -1, isTransactional, version); // The member does not exist. assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("member-id", null, 0, isTransactional)); + group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); // Create a member. group.updateMember(new StreamsGroupMember.Builder("member-id").build()); // A call from the admin client should fail as the group is not empty. assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("", "", -1, isTransactional)); + group.validateOffsetCommit("", "", -1, isTransactional, version)); // The member epoch is stale. assertThrows(StaleMemberEpochException.class, () -> - group.validateOffsetCommit("member-id", "", 10, isTransactional)); + group.validateOffsetCommit("member-id", "", 10, isTransactional, version)); // This should succeed. - group.validateOffsetCommit("member-id", "", 0, isTransactional); + group.validateOffsetCommit("member-id", "", 0, isTransactional, version); } @Test diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index a747f0aa2a8..a2cb052e7f2 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -105,7 +105,8 @@ public class SmokeTestDriverIntegrationTest extends IntegrationTestHarness { // We set 2 timeout condition to fail the test before passing the verification: // (1) 10 min timeout, (2) 30 tries of polling without getting any data @ParameterizedTest - @CsvSource({"false, false", "true, false", "true, true"}) + @CsvSource({"false, false, true"}) +// @CsvSource({"false, false", "true, false", "true, true"}) public void shouldWorkWithRebalance( final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled,
