This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 6a4aa0cf1d40d0585efd4b13f21d9ed6691baa7f Author: Lucas Brutschy <[email protected]> AuthorDate: Thu Aug 15 15:32:21 2024 +0200 Resolve conflicts from 11/25 trunk rebase - Implement DescribeStreamsGroup RPC handling Implement the DescribeStreamsGroup RPC handling for KIP-1071. --- .../org/apache/kafka/common/protocol/ApiKeys.java | 2 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java | 2 + .../requests/StreamsGroupDescribeRequest.java | 98 +++++++++ .../requests/StreamsGroupDescribeResponse.java | 76 +++++++ .../message/StreamsGroupDescribeRequest.json | 2 +- .../message/StreamsGroupDescribeResponse.json | 10 +- .../message/StreamsGroupHeartbeatRequest.json | 2 +- .../message/StreamsGroupInitializeRequest.json | 2 +- .../StreamsGroupHeartbeatRequestManagerTest.java | 3 +- .../kafka/common/requests/RequestResponseTest.java | 57 +++--- .../group/GroupCoordinatorAdapter.scala | 11 +- core/src/main/scala/kafka/server/KafkaApis.scala | 66 +++++- core/src/main/scala/kafka/server/KafkaConfig.scala | 7 + .../scala/unit/kafka/server/KafkaApisTest.scala | 224 +++++++++++++++++---- .../scala/unit/kafka/server/KafkaConfigTest.scala | 6 + gradle/spotbugs-exclude.xml | 5 + .../kafka/coordinator/group/GroupCoordinator.java | 14 ++ .../coordinator/group/GroupCoordinatorService.java | 53 +++++ .../coordinator/group/GroupCoordinatorShard.java | 16 ++ .../coordinator/group/GroupMetadataManager.java | 35 +++- .../group/streams/CurrentAssignmentBuilder.java | 8 +- .../coordinator/group/streams/StreamsGroup.java | 109 ++-------- .../group/streams/StreamsGroupMember.java | 109 +++++----- .../coordinator/group/streams/StreamsTopology.java | 33 +++ .../group/streams/TargetAssignmentBuilder.java | 19 -- .../group/GroupCoordinatorServiceTest.java | 138 +++++++++++++ .../group/GroupCoordinatorShardTest.java | 2 +- .../group/GroupMetadataManagerTest.java | 112 +++++++++++ .../group/GroupMetadataManagerTestContext.java | 94 ++++++++- .../group/streams/StreamsGroupBuilder.java | 101 ++++++++++ .../group/streams/StreamsGroupMemberTest.java | 127 +++++++++++- .../group/streams/StreamsGroupTest.java | 4 +- .../group/streams/StreamsTopologyTest.java | 182 +++++++++++++++++ .../group/streams/TargetAssignmentBuilderTest.java | 5 +- 35 files changed, 1463 insertions(+), 273 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index a53b082c997..345b9b808a8 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -131,8 +131,8 @@ public enum ApiKeys { WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true), DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true), READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), - STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT), STREAMS_GROUP_INITIALIZE(ApiMessageType.STREAMS_GROUP_INITIALIZE), + STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT), STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 3d99419987d..4e5d21251f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -354,6 +354,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion); case STREAMS_GROUP_INITIALIZE: return StreamsGroupInitializeRequest.parse(buffer, apiVersion); + case STREAMS_GROUP_DESCRIBE: + return StreamsGroupDescribeRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 9e25cfd99da..25f3afe0d5b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -291,6 +291,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse { return StreamsGroupHeartbeatResponse.parse(responseBuffer, version); case STREAMS_GROUP_INITIALIZE: return StreamsGroupInitializeResponse.parse(responseBuffer, version); + case STREAMS_GROUP_DESCRIBE: + return StreamsGroupDescribeResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java new file mode 100644 index 00000000000..2a3834d16a8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.kafka.common.message.StreamsGroupDescribeRequestData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +public class StreamsGroupDescribeRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<StreamsGroupDescribeRequest> { + + private final StreamsGroupDescribeRequestData data; + + public Builder(StreamsGroupDescribeRequestData data) { + this(data, false); + } + + public Builder(StreamsGroupDescribeRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.STREAMS_GROUP_DESCRIBE, enableUnstableLastVersion); + this.data = data; + } + + @Override + public StreamsGroupDescribeRequest build(short version) { + return new StreamsGroupDescribeRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsGroupDescribeRequestData data; + + public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short version) { + super(ApiKeys.STREAMS_GROUP_DESCRIBE, version); + this.data = data; + } + + @Override + public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) { + StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() + .setThrottleTimeMs(throttleTimeMs); + // Set error for each group + this.data.groupIds().forEach( + groupId -> data.groups().add( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.forException(e).code()) + ) + ); + return new StreamsGroupDescribeResponse(data); + } + + @Override + public StreamsGroupDescribeRequestData data() { + return data; + } + + public static StreamsGroupDescribeRequest parse(ByteBuffer buffer, short version) { + return new StreamsGroupDescribeRequest( + new StreamsGroupDescribeRequestData(new ByteBufferAccessor(buffer), version), + version + ); + } + + public static List<StreamsGroupDescribeResponseData.DescribedGroup> getErrorDescribedGroupList( + List<String> groupIds, + Errors error + ) { + return groupIds.stream() + .map(groupId -> new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(error.code()) + ).collect(Collectors.toList()); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java new file mode 100644 index 00000000000..cf1d5529623 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#INVALID_GROUP_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + */ +public class StreamsGroupDescribeResponse extends AbstractResponse { + + private final StreamsGroupDescribeResponseData data; + + public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data) { + super(ApiKeys.STREAMS_GROUP_DESCRIBE); + this.data = data; + } + + @Override + public StreamsGroupDescribeResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { + HashMap<Errors, Integer> counts = new HashMap<>(); + data.groups().forEach( + group -> updateErrorCounts(counts, Errors.forCode(group.errorCode())) + ); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsGroupDescribeResponse parse(ByteBuffer buffer, short version) { + return new StreamsGroupDescribeResponse( + new StreamsGroupDescribeResponseData(new ByteBufferAccessor(buffer), version) + ); + } +} diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json index 17b4abac9d2..b7047141e8a 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json @@ -16,7 +16,7 @@ { "apiKey": 90, "type": "request", - "listeners": ["broker"], + "listeners": ["broker", "zkBroker"], "name": "StreamsGroupDescribeRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index 1093100992f..9cf580f0441 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -45,8 +45,8 @@ "about": "The group epoch." }, { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", "about": "The assignment epoch." }, - { "name": "Topology", "type": "[]Subtopology", "versions": "0+", - "about": "The sub-topologies of the streams application.", + { "name": "Topology", "type": "[]Subtopology", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The sub-topologies of the streams application. Null if uninitialized.", "fields": [ { "name": "Subtopology", "type": "string", "versions": "0+", "about": "String to uniquely identify the sub-topology." }, @@ -54,8 +54,8 @@ "about": "The topics the topology reads from." }, { "name": "SourceTopicRegex", "type": "string", "versions": "0+", "about": "The regular expressions identifying topics the topology reads from." }, - { "name": "SinkTopics", "type": "[]string", "versions": "0+", - "about": "The topics the topology writes to." }, + { "name": "RepartitionSinkTopics", "type": "[]string", "versions": "0+", + "about": "The repartition topics the topology writes to." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", "about": "The set of state changelog topics associated with this sub-topology. Created automatically." }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", @@ -82,7 +82,7 @@ { "name": "TopologyId", "type": "string", "versions": "0+", "about": "The ID of the topology. Must be non-empty." }, - { "name": "ProcessId", "type": "uuid", "versions": "0+", + { "name": "ProcessId", "type": "string", "versions": "0+", "about": "Identity of the streams instance that may have multiple clients. " }, { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "about": "Used for rack-aware assignment algorithm." }, diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json index ac765b2269c..7f829f1e410 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json @@ -16,7 +16,7 @@ { "apiKey": 89, "type": "request", - "listeners": ["broker"], + "listeners": ["broker", "zkBroker"], "name": "StreamsGroupHeartbeatRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json index 169629fb2c0..602b557e3e9 100644 --- a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json +++ b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json @@ -16,7 +16,7 @@ { "apiKey": 88, "type": "request", - "listeners": ["broker"], + "listeners": ["broker", "zkBroker"], "name": "StreamsGroupInitializeRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index bc91fc5df83..f4c67e1d9b9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -214,7 +214,8 @@ class StreamsGroupHeartbeatRequestManagerTest { StreamsGroupHeartbeatRequest request = (StreamsGroupHeartbeatRequest) result.unsentRequests.get(0).requestBuilder().build(); assertEquals(processID.toString(), request.data().processId()); - assertEquals(endPoint, request.data().userEndpoint()); + assertEquals(endPoint.host, request.data().userEndpoint().host()); + assertEquals(endPoint.port, request.data().userEndpoint().port()); assertEquals(1, request.data().clientTags().size()); assertEquals("clientTag1", request.data().clientTags().get(0).key()); assertEquals("value2", request.data().clientTags().get(0).value()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 5bf5baf7c8b..1e5a1c4c864 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -234,6 +234,8 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState; import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState; import org.apache.kafka.common.message.StopReplicaResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeRequestData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; @@ -1127,8 +1129,9 @@ public class RequestResponseTest { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateRequest(version); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateRequest(version); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version); - case STREAMS_GROUP_HEARTBEAT: return createStreamsHeartbeatRequest(version); - case STREAMS_GROUP_INITIALIZE: return createStreamsInitializeRequest(version); + case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatRequest(version); + case STREAMS_GROUP_INITIALIZE: return createStreamsGroupInitializeRequest(version); + case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1223,8 +1226,9 @@ public class RequestResponseTest { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateResponse(); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse(); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse(); - case STREAMS_GROUP_HEARTBEAT: return createStreamsHeartbeatResponse(); - case STREAMS_GROUP_INITIALIZE: return createStreamsInitializeResponse(); + case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(); + case STREAMS_GROUP_INITIALIZE: return createStreamsGroupInitializeResponse(); + case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -4023,35 +4027,42 @@ public class RequestResponseTest { return new ReadShareGroupStateSummaryResponse(data); } -// private AbstractRequest createStreamsPrepareAssignmentRequest(final short version) { -// return new StreamsPrepareAssignmentRequest.Builder(new StreamsPrepareAssignmentRequestData()).build(version); -// } -// -// private AbstractRequest createStreamsInstallAssignmentRequest(final short version) { -// return new StreamsInstallAssignmentRequest.Builder(new StreamsInstallAssignmentRequestData()).build(version); -// } + private AbstractRequest createStreamsGroupDescribeRequest(final short version) { + return new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData() + .setGroupIds(Collections.singletonList("group")) + .setIncludeAuthorizedOperations(false)).build(version); + } - private AbstractRequest createStreamsInitializeRequest(final short version) { + private AbstractRequest createStreamsGroupInitializeRequest(final short version) { return new StreamsGroupInitializeRequest.Builder(new StreamsGroupInitializeRequestData()).build(version); } - private AbstractRequest createStreamsHeartbeatRequest(final short version) { + private AbstractRequest createStreamsGroupHeartbeatRequest(final short version) { return new StreamsGroupHeartbeatRequest.Builder(new StreamsGroupHeartbeatRequestData()).build(version); } -// private AbstractResponse createStreamsPrepareAssignmentResponse() { -// return new StreamsPrepareAssignmentResponse(new StreamsPrepareAssignmentResponseData()); -// } -// -// private AbstractResponse createStreamsInstallAssignmentResponse() { -// return new StreamsInstallAssignmentResponse(new StreamsInstallAssignmentResponseData()); -// } -// - private AbstractResponse createStreamsInitializeResponse() { + private AbstractResponse createStreamsGroupDescribeResponse() { + StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData() + .setGroups(Collections.singletonList( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group") + .setErrorCode((short) 0) + .setErrorMessage(Errors.forCode((short) 0).message()) + .setGroupState("EMPTY") + .setGroupEpoch(0) + .setAssignmentEpoch(0) + .setMembers(new ArrayList<>(0)) + .setTopology(new ArrayList<>(0)) + )) + .setThrottleTimeMs(1000); + return new StreamsGroupDescribeResponse(data); + } + + private AbstractResponse createStreamsGroupInitializeResponse() { return new StreamsGroupInitializeResponse(new StreamsGroupInitializeResponseData()); } - private AbstractResponse createStreamsHeartbeatResponse() { + private AbstractResponse createStreamsGroupHeartbeatResponse() { return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData()); } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 74f30f0d765..a48b2c6ffc8 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -18,7 +18,7 @@ package kafka.coordinator.group import kafka.server.{KafkaConfig, ReplicaManager} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] +import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch @@ -672,6 +672,15 @@ private[group] class GroupCoordinatorAdapter( )) } + override def streamsGroupDescribe( + context: RequestContext, + groupIds: util.List[String] + ): CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]] = { + FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( + s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_DESCRIBE.name} API." + )) + } + override def shareGroupDescribe( context: RequestContext, groupIds: util.List[String] diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9b22b16a55f..d3bfd420ace 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -276,6 +276,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request) case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) + case ApiKeys.STREAMS_GROUP_DESCRIBE => handleStreamsGroupDescribe(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_INITIALIZE => handleStreamsInitialize(request).exceptionally(handleError) case ApiKeys.STREAMS_GROUP_HEARTBEAT => handleStreamsHeartbeat(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") @@ -3885,12 +3886,16 @@ class KafkaApis(val requestChannel: RequestChannel, } + private def isStreamsGroupProtocolEnabled(): Boolean = { + config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) + } + def handleStreamsInitialize(request: RequestChannel.Request): CompletableFuture[Unit] = { val streamsInitializeRequest = request.body[StreamsGroupInitializeRequest] // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS - if (!config.isNewGroupCoordinatorEnabled) { + if (!isStreamsGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) @@ -3915,7 +3920,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleStreamsHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = { val streamsHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest] - if (!config.isNewGroupCoordinatorEnabled) { + if (!isStreamsGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. requestHelper.sendMaybeThrottle(request, streamsHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) @@ -3937,6 +3942,63 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleStreamsGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { + val streamsGroupDescribeRequest = request.body[StreamsGroupDescribeRequest] + val includeAuthorizedOperations = streamsGroupDescribeRequest.data.includeAuthorizedOperations + + if (!isStreamsGroupProtocolEnabled()) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, request.body[StreamsGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val response = new StreamsGroupDescribeResponseData() + + val authorizedGroups = new ArrayBuffer[String]() + streamsGroupDescribeRequest.data.groupIds.forEach { groupId => + if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { + response.groups.add(new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code) + ) + } else { + authorizedGroups += groupId + } + } + + groupCoordinator.streamsGroupDescribe( + request.context, + authorizedGroups.asJava + ).handle[Unit] { (results, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, streamsGroupDescribeRequest.getErrorResponse(exception)) + } else { + if (includeAuthorizedOperations) { + results.forEach { groupResult => + if (groupResult.errorCode == Errors.NONE.code) { + groupResult.setAuthorizedOperations(authHelper.authorizedOperations( + request, + new Resource(ResourceType.GROUP, groupResult.groupId) + )) + } + } + } + + if (response.groups.isEmpty) { + // If the response is empty, we can directly reuse the results. + response.setGroups(results) + } else { + // Otherwise, we have to copy the results into the existing ones. + response.groups.addAll(results) + } + + requestHelper.sendMaybeThrottle(request, new StreamsGroupDescribeResponse(response)) + } + } + } + + } + def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 18327a59b55..3b9aa091b8a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -599,6 +599,13 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol are enabled. " + "This is part of the early access of KIP-932 and MUST NOT be used in production.") } + if (protocols.contains(GroupType.STREAMS)) { + if (processRoles.isEmpty) { + throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance protocol is only supported in KRaft cluster.") + } + warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled along with the new group coordinator. " + + "This is part of the preview of KIP-1071 and MUST NOT be used in production.") + } protocols } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 917c47aeea2..202149fe747 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11235,42 +11235,48 @@ class KafkaApisTest extends Logging { } @Test - def testStreamsInitializeRequest(): Unit = { - val streamsInitializeRequest = new StreamsGroupInitializeRequestData().setGroupId("group") + def testStreamsGroupInitializeRequest(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + + val streamsGroupInitializeRequest = new StreamsGroupInitializeRequestData().setGroupId("group") - val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsInitializeRequest, true).build()) + val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, true).build()) val future = new CompletableFuture[StreamsGroupInitializeResponseData]() when(groupCoordinator.streamsInitialize( requestChannelRequest.context, - streamsInitializeRequest + streamsGroupInitializeRequest )).thenReturn(future) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) - val streamsInitializeResponse = new StreamsGroupInitializeResponseData() + val streamsGroupInitializeResponse = new StreamsGroupInitializeResponseData() - future.complete(streamsInitializeResponse) + future.complete(streamsGroupInitializeResponse) val response = verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest) - assertEquals(streamsInitializeResponse, response.data) + assertEquals(streamsGroupInitializeResponse, response.data) } @Test - def testStreamsInitializeRequestFutureFailed(): Unit = { - val streamsInitializeRequest = new StreamsGroupInitializeRequestData().setGroupId("group") + def testStreamsGroupInitializeRequestFutureFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) - val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsInitializeRequest, true).build()) + val streamsGroupInitializeRequest = new StreamsGroupInitializeRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, true).build()) val future = new CompletableFuture[StreamsGroupInitializeResponseData]() when(groupCoordinator.streamsInitialize( requestChannelRequest.context, - streamsInitializeRequest + streamsGroupInitializeRequest )).thenReturn(future) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) @@ -11279,17 +11285,20 @@ class KafkaApisTest extends Logging { } @Test - def testStreamsInitializeRequestAuthorizationFailed(): Unit = { - val streamsInitializeRequest = new StreamsGroupInitializeRequestData().setGroupId("group") + def testStreamsGroupInitializeRequestAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + + val streamsGroupInitializeRequest = new StreamsGroupInitializeRequestData().setGroupId("group") - val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsInitializeRequest, true).build()) + val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, true).build()) val authorizer: Authorizer = mock(classOf[Authorizer]) when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) .thenReturn(Seq(AuthorizationResult.DENIED).asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) @@ -11299,43 +11308,49 @@ class KafkaApisTest extends Logging { @Test - def testStreamsHeartbeatRequest(): Unit = { - val streamsHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") + def testStreamsGroupHeartbeatRequest(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) - val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build()) + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]() when(groupCoordinator.streamsHeartbeat( requestChannelRequest.context, - streamsHeartbeatRequest + streamsGroupHeartbeatRequest )).thenReturn(future) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) - val streamsHeartbeatResponse = new StreamsGroupHeartbeatResponseData() + val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() .setMemberId("member") - future.complete(streamsHeartbeatResponse) + future.complete(streamsGroupHeartbeatResponse) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) - assertEquals(streamsHeartbeatResponse, response.data) + assertEquals(streamsGroupHeartbeatResponse, response.data) } @Test - def testStreamsHeartbeatRequestFutureFailed(): Unit = { - val streamsHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") + def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") - val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build()) + val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]() when(groupCoordinator.streamsHeartbeat( requestChannelRequest.context, - streamsHeartbeatRequest + streamsGroupHeartbeatRequest )).thenReturn(future) - kafkaApis = createKafkaApis(overrideProperties = Map( - GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" - )) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true + ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) @@ -11344,17 +11359,20 @@ class KafkaApisTest extends Logging { } @Test - def testStreamsHeartbeatRequestAuthorizationFailed(): Unit = { - val streamsHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") + def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) - val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build()) + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) val authorizer: Authorizer = mock(classOf[Authorizer]) when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) .thenReturn(Seq(AuthorizationResult.DENIED).asJava) kafkaApis = createKafkaApis( authorizer = Some(authorizer), - overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) @@ -11483,6 +11501,128 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() + .setIncludeAuthorizedOperations(includeAuthorizedOperations) + streamsGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.streamsGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + future.complete(List( + new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), + new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)), + new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2)) + ).asJava) + + var authorizedOperationsInt = Int.MinValue; + if (includeAuthorizedOperations) { + authorizedOperationsInt = Utils.to32BitField( + AclEntry.supportedOperations(ResourceType.GROUP).asScala + .map(_.code.asInstanceOf[JByte]).asJava) + } + + // Can't reuse the above list here because we would not test the implementation in KafkaApis then + val describedGroups = List( + new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), + new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)), + new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2)) + ).map(group => group.setAuthorizedOperations(authorizedOperationsInt)) + val expectedStreamsGroupDescribeResponseData = new StreamsGroupDescribeResponseData() + .setGroups(describedGroups.asJava) + + val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) + + assertEquals(expectedStreamsGroupDescribeResponseData, response.data) + } + + @Test + def testStreamsGroupDescribeReturnsUnsupportedVersion(): Unit = { + metadataCache = mock(classOf[ZkMetadataCache]) + + val groupId = "group0" + val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() + streamsGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build()) + + val errorCode = Errors.UNSUPPORTED_VERSION.code + val expectedDescribedGroup = new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(errorCode) + val expectedResponse = new StreamsGroupDescribeResponseData() + expectedResponse.groups.add(expectedDescribedGroup) + kafkaApis = createKafkaApis() + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) + + assertEquals(expectedResponse, response.data) + } + + @Test + def testStreamsGroupDescribeAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + + val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() + streamsGroupDescribeRequestData.groupIds.add("group-id") + val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + + val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.streamsGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + future.complete(List().asJava) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.groups.get(0).errorCode) + } + + @Test + def testStreamsGroupDescribeFutureFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + + val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() + streamsGroupDescribeRequestData.groupIds.add("group-id") + val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.streamsGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + kafkaApis = createKafkaApis( + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams"), + raftSupport = true + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) + val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) + assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) + } + @Test def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = { val data = new GetTelemetrySubscriptionsRequestData() diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 1c5c2d3b0cd..28717aee87b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1938,6 +1938,12 @@ class KafkaConfigTest { assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), config.groupCoordinatorRebalanceProtocols) assertTrue(config.isNewGroupCoordinatorEnabled) assertTrue(config.shareGroupConfig.isShareGroupEnabled) + + // This is OK. + props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,streams") + val config2 = KafkaConfig.fromProps(props) + assertEquals(Set(GroupType.CLASSIC, GroupType.STREAMS), config2.groupCoordinatorRebalanceProtocols) + assertTrue(config2.isNewGroupCoordinatorEnabled) } @Test diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index b5a3c9bd96e..2b3c5b128d8 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -562,4 +562,9 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read <Bug pattern="SING_SINGLETON_HAS_NONPRIVATE_CONSTRUCTOR"/> </Match> + <Match> + <Class name="org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder"/> + <Bug pattern="URF_UNREAD_FIELD"/> + </Match> + </FindBugsFilter> diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index f25b5c39461..cea5da08608 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; @@ -229,6 +230,19 @@ public interface GroupCoordinator { List<String> groupIds ); + /** + * Describe streams groups. + * + * @param context The coordinator request context. + * @param groupIds The group ids. + * + * @return A future yielding the results or an exception. + */ + CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe( + RequestContext context, + List<String> groupIds + ); + /** * Describe share groups. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index b2e2460064c..5d632528961 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; @@ -60,6 +61,7 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.ShareGroupDescribeRequest; +import org.apache.kafka.common.requests.StreamsGroupDescribeRequest; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.utils.BufferSupplier; @@ -723,6 +725,57 @@ public class GroupCoordinatorService implements GroupCoordinator { return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll); } + /** + * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> streamsGroupDescribe( + RequestContext context, + List<String> groupIds + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList( + groupIds, + Errors.COORDINATOR_NOT_AVAILABLE + )); + } + + final List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> futures = + new ArrayList<>(groupIds.size()); + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + if (isGroupIdNotEmpty(groupId)) { + groupsByTopicPartition + .computeIfAbsent(topicPartitionFor(groupId), __ -> new ArrayList<>()) + .add(groupId); + } else { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + ))); + } + }); + + groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future = + runtime.scheduleReadOperation( + "streams-group-describe", + topicPartition, + (coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe(groupIds, lastCommittedOffset) + ).exceptionally(exception -> handleOperationException( + "streams-group-describe", + groupList, + exception, + (error, __) -> StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error) + )); + + futures.add(future); + }); + + return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll); + } + /** * See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 4816649d9f6..dff55c9b684 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; @@ -645,6 +646,21 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord return groupMetadataManager.consumerGroupDescribe(groupIds, committedOffset); } + /** + * Handles a StreamsGroupDescribe request. + * + * @param groupIds The IDs of the groups to describe. + * + * @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup. + * + */ + public List<StreamsGroupDescribeResponseData.DescribedGroup> streamsGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + return groupMetadataManager.streamsGroupDescribe(groupIds, committedOffset); + } + /** * Handles a ShareGroupDescribe request. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 1d95cad5a7e..e934304b168 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -53,6 +53,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Endpoint; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.KeyValue; @@ -863,6 +864,33 @@ public class GroupMetadataManager { return describedGroups; } + /** + * Handles a StreamsGroupDescribe request. + * + * @param groupIds The IDs of the groups to describe. + * @param committedOffset A specified committed offset corresponding to this shard. + * + * @return A list containing the StreamsGroupDescribeResponseData.DescribedGroup. + */ + public List<StreamsGroupDescribeResponseData.DescribedGroup> streamsGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + final List<StreamsGroupDescribeResponseData.DescribedGroup> describedGroups = new ArrayList<>(); + groupIds.forEach(groupId -> { + try { + describedGroups.add(streamsGroup(groupId, committedOffset).asDescribedGroup(committedOffset)); + } catch (GroupIdNotFoundException exception) { + describedGroups.add(new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + ); + } + }); + + return describedGroups; + } + /** * Handles a DescribeGroup request. * @@ -3839,10 +3867,8 @@ public class GroupMetadataManager { boolean staticMemberReplaced, List<CoordinatorRecord> records ) { - String preferredServerAssignor = group.computePreferredServerAssignor( - member, - updatedMember - ).orElse(defaultTaskAssignor.name()); + // TODO: Read the preferred server assignor from the group configuration + String preferredServerAssignor = defaultTaskAssignor.name(); try { org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder assignmentResultBuilder = new org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(group.groupId(), groupEpoch, taskAssignors.get(preferredServerAssignor)) @@ -3851,7 +3877,6 @@ public class GroupMetadataManager { .withStaticMembers(group.staticMembers()) .withSubscriptionMetadata(subscriptionMetadata) .withTargetAssignment(group.targetAssignment()) - .withTopicsImage(metadataImage.topics()) .addOrUpdateMember(updatedMember.memberId(), updatedMember); org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; // A new static member is replacing an older one with the same subscriptions. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java index 19eb79e36e6..f7690381708 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java @@ -182,7 +182,7 @@ public class CurrentAssignmentBuilder { // If the member provides its owned tasks. We verify if it still // owns any of the revoked tasks. If it does, we cannot progress. - if (ownsRevokedTasks(member.activeTasksPendingRevocation())) { + if (ownsRevokedActiveTasks(member.activeTasksPendingRevocation())) { return member; } @@ -229,12 +229,12 @@ public class CurrentAssignmentBuilder { } /** - * Decides whether the current ownedTopicTasks contains any partition that is pending revocation. + * Decides whether the current ownedActiveTasks contains any partition that is pending revocation. * * @param assignment The assignment that has the tasks pending revocation. * @return A boolean based on the condition mentioned above. */ - private boolean ownsRevokedTasks( + private boolean ownsRevokedActiveTasks( Map<String, Set<Integer>> assignment ) { if (ownedActiveTasks == null) { @@ -313,7 +313,7 @@ public class CurrentAssignmentBuilder { } } - if (!newTasksPendingRevocation.isEmpty() && ownsRevokedTasks(newTasksPendingRevocation)) { + if (!newTasksPendingRevocation.isEmpty() && ownsRevokedActiveTasks(newTasksPendingRevocation)) { // If there are tasks to be revoked, the member remains in its current // epoch and requests the revocation of those tasks. It transitions to // the UNREVOKED_TASKS state to wait until the client acknowledges the diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 09dcf7817a5..1d4c2ace5e1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ListGroupsResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.coordinator.group.CoordinatorRecord; import org.apache.kafka.coordinator.group.Group; @@ -131,11 +132,6 @@ public class StreamsGroup implements Group { */ private final TimelineHashMap<String, String> staticMembers; - /** - * The number of members supporting each assignor name. - */ - private final TimelineHashMap<String, Integer> assignors; - /** * The metadata associated with each subscribed topic name. */ @@ -175,7 +171,7 @@ public class StreamsGroup implements Group { /** * The Streams topology. */ - private TimelineObject<Optional<StreamsTopology>> topology; + private final TimelineObject<Optional<StreamsTopology>> topology; /** * The metadata refresh deadline. It consists of a timestamp in milliseconds together with the group epoch at the time of setting it. @@ -197,7 +193,6 @@ public class StreamsGroup implements Group { this.groupEpoch = new TimelineInteger(snapshotRegistry); this.members = new TimelineHashMap<>(snapshotRegistry, 0); this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0); - this.assignors = new TimelineHashMap<>(snapshotRegistry, 0); this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); @@ -369,7 +364,6 @@ public class StreamsGroup implements Group { maybeUpdateTaskEpoch(oldMember, newMember); updateStaticMember(newMember); maybeUpdateGroupState(); - maybeUpdateAssignors(oldMember, newMember); } /** @@ -461,36 +455,6 @@ public class StreamsGroup implements Group { return Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment); } - /** - * Updates the server assignors count. - * - * @param oldMember The old member. - * @param newMember The new member. - */ - private void maybeUpdateAssignors( - StreamsGroupMember oldMember, - StreamsGroupMember newMember - ) { - maybeUpdateAssignors(assignors, oldMember, newMember); - } - - private static void maybeUpdateAssignors( - Map<String, Integer> serverAssignorCount, - StreamsGroupMember oldMember, - StreamsGroupMember newMember - ) { - if (oldMember != null) { - oldMember.assignor().ifPresent(name -> - serverAssignorCount.compute(name, StreamsGroup::decValue) - ); - } - if (newMember != null) { - newMember.assignor().ifPresent(name -> - serverAssignorCount.compute(name, StreamsGroup::incValue) - ); - } - } - /** * Updates the target assignment of a member. * @@ -656,43 +620,6 @@ public class StreamsGroup implements Group { } } - /** - * Compute the preferred (server side) assignor for the group while taking into account the updated member. The computation relies on - * {{@link StreamsGroup#assignors}} persisted structure but it does not update it. - * - * @param oldMember The old member. - * @param newMember The new member. - * @return An Optional containing the preferred assignor. - */ - public Optional<String> computePreferredServerAssignor( - StreamsGroupMember oldMember, - StreamsGroupMember newMember - ) { - // Copy the current count and update it. - Map<String, Integer> counts = new HashMap<>(this.assignors); - maybeUpdateAssignors(counts, oldMember, newMember); - - return counts.entrySet().stream() - .max(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey); - } - - /** - * @return The preferred assignor for the group. - */ - public Optional<String> preferredServerAssignor() { - return preferredServerAssignor(Long.MAX_VALUE); - } - - /** - * @return The preferred assignor for the group with given offset. - */ - public Optional<String> preferredServerAssignor(long committedOffset) { - return assignors.entrySet(committedOffset).stream() - .max(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey); - } - /** * @return An immutable Map of subscription metadata for each topic that the consumer group is subscribed to. */ @@ -1097,21 +1024,23 @@ public class StreamsGroup implements Group { }); } - /** - * Decrements value by 1; returns null when reaching zero. This helper is meant to be used with Map#compute. - */ - private static Integer decValue(String key, Integer value) { - if (value == null) { - return null; - } - return value == 1 ? null : value - 1; - } - - /** - * Increments value by 1; This helper is meant to be used with Map#compute. - */ - private static Integer incValue(String key, Integer value) { - return value == null ? 1 : value + 1; + public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup( + long committedOffset + ) { + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setGroupEpoch(groupEpoch.get(committedOffset)) + .setGroupState(state.get(committedOffset).toString()) + .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset)) + .setTopology(topology.get(committedOffset).map(StreamsTopology::asStreamsGroupDescribeTopology).orElse(null)); + members.entrySet(committedOffset).forEach( + entry -> describedGroup.members().add( + entry.getValue().asStreamsGroupDescribeMember( + targetAssignment.get(entry.getValue().memberId(), committedOffset) + ) + ) + ); + return describedGroup; } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java index 5ad4370a5ab..e50e450feb4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java @@ -16,12 +16,9 @@ */ package org.apache.kafka.coordinator.group.streams; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; -import org.apache.kafka.image.TopicImage; -import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; import java.util.Collections; @@ -40,7 +37,7 @@ import java.util.stream.Collectors; */ public class StreamsGroupMember { - /** + /** * A builder that facilitates the creation of a new member or the update of an existing one. * <p> * Please refer to the javadoc of {{@link StreamsGroupMember}} for the definition of the fields. @@ -57,10 +54,9 @@ public class StreamsGroupMember { private String clientId = ""; private String clientHost = ""; private String topologyId; - private String assignor; private String processId; private StreamsGroupMemberMetadataValue.Endpoint userEndpoint; - private Map<String, String> clientTags; + private Map<String, String> clientTags = Collections.emptyMap(); private Map<String, Set<Integer>> assignedActiveTasks = Collections.emptyMap(); private Map<String, Set<Integer>> assignedStandbyTasks = Collections.emptyMap(); private Map<String, Set<Integer>> assignedWarmupTasks = Collections.emptyMap(); @@ -82,7 +78,6 @@ public class StreamsGroupMember { this.clientId = member.clientId; this.clientHost = member.clientHost; this.topologyId = member.topologyId; - this.assignor = member.assignor; this.processId = member.processId; this.userEndpoint = member.userEndpoint; this.clientTags = member.clientTags; @@ -140,11 +135,6 @@ public class StreamsGroupMember { return this; } - public StreamsGroupMember.Builder maybeUpdateAssignor(Optional<String> assignor) { - this.assignor = assignor.orElse(this.assignor); - return this; - } - public Builder setClientId(String clientId) { this.clientId = clientId; return this; @@ -170,11 +160,6 @@ public class StreamsGroupMember { return this; } - public Builder setAssignor(String assignor) { - this.assignor = assignor; - return this; - } - public Builder setProcessId(String processId) { this.processId = processId; return this; @@ -271,7 +256,6 @@ public class StreamsGroupMember { clientId, clientHost, topologyId, - assignor, processId, userEndpoint, clientTags, @@ -334,11 +318,6 @@ public class StreamsGroupMember { */ private final String topologyId; - /** - * The assignor - */ - private final String assignor; - /** * The process ID */ @@ -385,7 +364,6 @@ public class StreamsGroupMember { String clientId, String clientHost, String topologyId, - String assignor, String processId, StreamsGroupMemberMetadataValue.Endpoint userEndpoint, Map<String, String> clientTags, @@ -405,7 +383,6 @@ public class StreamsGroupMember { this.clientId = clientId; this.clientHost = clientHost; this.topologyId = topologyId; - this.assignor = assignor; this.processId = processId; this.userEndpoint = userEndpoint; this.clientTags = clientTags; @@ -478,13 +455,6 @@ public class StreamsGroupMember { return topologyId; } - /** - * @return The assignor - */ - public Optional<String> assignor() { - return Optional.ofNullable(assignor); - } - /** * @return The process ID */ @@ -548,33 +518,58 @@ public class StreamsGroupMember { return activeTasksPendingRevocation; } - private static List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitionsFromMap( - Map<Uuid, Set<Integer>> partitions, - TopicsImage topicsImage + /** + * @param targetAssignment The target assignment of this member in the corresponding group. + * + * @return The StreamsGroupMember mapped as StreamsGroupDescribeResponseData.Member. + */ + public StreamsGroupDescribeResponseData.Member asStreamsGroupDescribeMember( + Assignment targetAssignment ) { - List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitions = new ArrayList<>(); - partitions.forEach((topicId, partitionSet) -> { - String topicName = lookupTopicNameById(topicId, topicsImage); - if (topicName != null) { - topicPartitions.add(new ConsumerGroupDescribeResponseData.TopicPartitions() - .setTopicId(topicId) - .setTopicName(topicName) - .setPartitions(new ArrayList<>(partitionSet))); - } - }); - return topicPartitions; + final StreamsGroupDescribeResponseData.Assignment describedTargetAssignment = + new StreamsGroupDescribeResponseData.Assignment(); + + if (targetAssignment != null) { + describedTargetAssignment + .setActiveTasks(taskIdsFromMap(targetAssignment.activeTasks())) + .setStandbyTasks(taskIdsFromMap(targetAssignment.standbyTasks())) + .setWarmupTasks(taskIdsFromMap(targetAssignment.warmupTasks())); + } + + return new StreamsGroupDescribeResponseData.Member() + .setMemberEpoch(memberEpoch) + .setMemberId(memberId) + .setAssignment( + new StreamsGroupDescribeResponseData.Assignment() + .setActiveTasks(taskIdsFromMap(assignedActiveTasks)) + .setStandbyTasks(taskIdsFromMap(assignedStandbyTasks)) + .setWarmupTasks(taskIdsFromMap(assignedWarmupTasks))) + .setTargetAssignment(describedTargetAssignment) + .setClientHost(clientHost) + .setClientId(clientId) + .setInstanceId(instanceId) + .setRackId(rackId) + .setClientTags(clientTags.entrySet().stream().map( + entry -> new StreamsGroupDescribeResponseData.KeyValue() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + ).collect(Collectors.toList())) + .setProcessId(processId) + .setTopologyId(topologyId); + // TODO: TaskOffset and TaskEndOffset are missing. + } - private static String lookupTopicNameById( - Uuid topicId, - TopicsImage topicsImage + private static List<StreamsGroupDescribeResponseData.TaskIds> taskIdsFromMap( + Map<String, Set<Integer>> tasks ) { - TopicImage topicImage = topicsImage.getTopic(topicId); - if (topicImage != null) { - return topicImage.name(); - } else { - return null; - } + List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new ArrayList<>(); + tasks.forEach((subtopologyId, partitionSet) -> { + taskIds.add(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopology(subtopologyId) + .setPartitions(new ArrayList<>(partitionSet))); + }); + return taskIds; } @SuppressWarnings("checkstyle:CyclomaticComplexity") @@ -597,7 +592,6 @@ public class StreamsGroupMember { && Objects.equals(clientId, that.clientId) && Objects.equals(clientHost, that.clientHost) && Objects.deepEquals(topologyId, that.topologyId) - && Objects.equals(assignor, that.assignor) && Objects.equals(processId, that.processId) && Objects.equals(userEndpoint, that.userEndpoint) && Objects.equals(clientTags, that.clientTags) @@ -619,7 +613,6 @@ public class StreamsGroupMember { result = 31 * result + Objects.hashCode(clientId); result = 31 * result + Objects.hashCode(clientHost); result = 31 * result + Objects.hashCode(topologyId); - result = 31 * result + Objects.hashCode(assignor); result = 31 * result + Objects.hashCode(processId); result = 31 * result + Objects.hashCode(userEndpoint); result = 31 * result + Objects.hashCode(clientTags); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java index e0bec1c292a..5f09941fae9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.coordinator.group.streams; +import java.util.List; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; @@ -88,4 +90,35 @@ public class StreamsTopology { ", subtopologies=" + subtopologies + '}'; } + + public List<StreamsGroupDescribeResponseData.Subtopology> asStreamsGroupDescribeTopology() { + return subtopologies.values().stream().map( + subtopology -> new StreamsGroupDescribeResponseData.Subtopology() + .setSourceTopicRegex(subtopology.sourceTopicRegex()) + .setSubtopology(subtopology.subtopology()) + .setSourceTopics(subtopology.sourceTopics()) + .setRepartitionSinkTopics(subtopology.repartitionSinkTopics()) + .setRepartitionSourceTopics( + asStreamsGroupDescribeTopicInfo(subtopology.repartitionSourceTopics())) + .setStateChangelogTopics( + asStreamsGroupDescribeTopicInfo(subtopology.stateChangelogTopics())) + ).collect(Collectors.toList()); + } + + private static List<StreamsGroupDescribeResponseData.TopicInfo> asStreamsGroupDescribeTopicInfo( + final List<TopicInfo> topicInfos) { + return topicInfos.stream().map(x -> + new StreamsGroupDescribeResponseData.TopicInfo() + .setName(x.name()) + .setPartitions(x.partitions()) + .setTopicConfigs( + x.topicConfigs() != null ? + x.topicConfigs().stream().map( + y -> new StreamsGroupDescribeResponseData.KeyValue() + .setKey(y.key()) + .setValue(y.value()) + ).collect(Collectors.toList()) : null + ) + ).collect(Collectors.toList()); + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java index 1fd34e27e89..51e07c3418c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java @@ -23,7 +23,6 @@ import org.apache.kafka.coordinator.group.taskassignor.GroupSpecImpl; import org.apache.kafka.coordinator.group.taskassignor.MemberAssignment; import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.coordinator.group.taskassignor.TaskAssignorException; -import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; import java.util.Collections; @@ -115,11 +114,6 @@ public class TargetAssignmentBuilder { */ private Map<String, org.apache.kafka.coordinator.group.streams.Assignment> targetAssignment = Collections.emptyMap(); - /** - * The topics image. - */ - private TopicsImage topicsImage = TopicsImage.EMPTY; - /** * The topology. */ @@ -204,19 +198,6 @@ public class TargetAssignmentBuilder { return this; } - /** - * Adds the topics image. - * - * @param topicsImage The topics image. - * @return This object. - */ - public TargetAssignmentBuilder withTopicsImage( - TopicsImage topicsImage - ) { - this.topicsImage = topicsImage; - return this; - } - /** * Adds the topology image. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index fcdd221c907..5b1aad09027 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; @@ -1771,6 +1772,143 @@ public class GroupCoordinatorServiceTest { ); } + @Test + public void testStreamsGroupDescribe() throws InterruptedException, ExecutionException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + int partitionCount = 2; + service.startup(() -> partitionCount); + + StreamsGroupDescribeResponseData.DescribedGroup describedGroup1 = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id-1"); + StreamsGroupDescribeResponseData.DescribedGroup describedGroup2 = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id-2"); + List<StreamsGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( + describedGroup1, + describedGroup2 + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("streams-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); + + CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>(); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("streams-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.any() + )).thenReturn(describedGroupFuture); + + CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future = + service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Arrays.asList("group-id-1", "group-id-2")); + + assertFalse(future.isDone()); + describedGroupFuture.complete(Collections.singletonList(describedGroup2)); + assertEquals(expectedDescribedGroups, future.get()); + } + + @Test + public void testStreamsGroupDescribeInvalidGroupId() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + int partitionCount = 1; + service.startup(() -> partitionCount); + + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()); + List<StreamsGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()), + describedGroup + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("streams-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup))); + + CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future = + service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Arrays.asList("", null)); + + assertEquals(expectedDescribedGroups, future.get()); + } + + @Test + public void testStreamsGroupDescribeCoordinatorLoadInProgress() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + int partitionCount = 1; + service.startup(() -> partitionCount); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("streams-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + new CoordinatorLoadInProgressException(null) + )); + + CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future = + service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Collections.singletonList("group-id")); + + assertEquals( + Collections.singletonList(new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + ), + future.get() + ); + } + + @Test + public void testStreamsGroupDescribeCoordinatorNotActive() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("streams-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + Errors.COORDINATOR_NOT_AVAILABLE.exception() + )); + + CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future = + service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), Collections.singletonList("group-id")); + + assertEquals( + Collections.singletonList(new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id") + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + ), + future.get() + ); + } + @Test public void testDeleteOffsets() throws Exception { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 2adadc1be4d..e7fffb90c8c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -100,7 +100,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@SuppressWarnings({"ClassFanOutComplexity"}) +@SuppressWarnings("ClassFanOutComplexity") public class GroupCoordinatorShardTest { @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4602593b6e3..bbbfe346539 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; @@ -90,6 +91,10 @@ import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; +import org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers; +import org.apache.kafka.coordinator.group.streams.StreamsGroup; +import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; @@ -8839,6 +8844,113 @@ public class GroupMetadataManagerTest { assertEquals(expected, actual); } + @Test + public void testStreamsGroupDescribeNoErrors() { + List<String> streamsGroupIds = Arrays.asList("group-id-1", "group-id-2"); + int epoch = 10; + String memberId = "member-id"; + StreamsGroupMember.Builder memberBuilder = new StreamsGroupMember.Builder(memberId) + .setClientTags(Collections.singletonMap("clientTag", "clientValue")) + .setProcessId("processId") + .setMemberEpoch(epoch) + .setPreviousMemberEpoch(epoch - 1); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), epoch)) + .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), epoch) + .withMember(memberBuilder.build())) + .build(); + + List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Arrays.asList( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupEpoch(epoch) + .setGroupId(streamsGroupIds.get(0)) + .setGroupState(StreamsGroup.StreamsGroupState.EMPTY.toString()) + .setAssignmentEpoch(0), + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupEpoch(epoch) + .setGroupId(streamsGroupIds.get(1)) + .setMembers(Collections.singletonList( + memberBuilder.build().asStreamsGroupDescribeMember( + new org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap()) + ) + )) + .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString()) + ); + List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(streamsGroupIds); + + assertEquals(expected, actual); + } + + @Test + public void testStreamsGroupDescribeWithErrors() { + String groupId = "groupId"; + + MockTaskAssignor assignor = new MockTaskAssignor("mock"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withTaskAssignors(Collections.singletonList(assignor)) + .build(); + + List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.sendStreamsGroupDescribe(Collections.singletonList(groupId)); + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()); + List<StreamsGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList( + describedGroup + ); + + assertEquals(expected, actual); + } + + @Test + public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() { + String streamsGroupId = "streamsGroupId"; + int epoch = 10; + String memberId1 = "memberId1"; + String memberId2 = "memberId2"; + String subtopologyId = "subtopology1"; + + MockTaskAssignor assignor = new MockTaskAssignor("mock"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withTaskAssignors(Collections.singletonList(assignor)) + .build(); + + StreamsGroupMember.Builder memberBuilder1 = new StreamsGroupMember.Builder(memberId1); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder1.build())); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 1)); + + Map<String, Set<Integer>> assignmentMap = new HashMap<>(); + assignmentMap.put(subtopologyId, Collections.emptySet()); + + StreamsGroupMember.Builder memberBuilder2 = new StreamsGroupMember.Builder(memberId2); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId, memberBuilder2.build())); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(streamsGroupId, memberId2, assignmentMap, assignmentMap, assignmentMap)); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(streamsGroupId, memberBuilder2.build())); + context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId, epoch + 2)); + + List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId), context.lastCommittedOffset); + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(streamsGroupId) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()); + assertEquals(1, actual.size()); + assertEquals(describedGroup, actual.get(0)); + + // Commit the offset and test again + context.commit(); + + actual = context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId), context.lastCommittedOffset); + describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(streamsGroupId) + .setMembers(Arrays.asList( + memberBuilder1.build().asStreamsGroupDescribeMember(new org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap())), + memberBuilder2.build().asStreamsGroupDescribeMember(new org.apache.kafka.coordinator.group.streams.Assignment(assignmentMap, assignmentMap, assignmentMap)) + )) + .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString()) + .setGroupEpoch(epoch + 2); + assertEquals(1, actual.size()); + assertEquals(describedGroup, actual.get(0)); + } + @Test public void testDescribeGroupStable() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 7e38f61ffd5..45654736316 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; @@ -88,6 +89,18 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; @@ -96,6 +109,7 @@ import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder; +import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -139,6 +153,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +@SuppressWarnings("CyclomaticComplexity") public class GroupMetadataManagerTestContext { static final String DEFAULT_CLIENT_ID = "client"; static final InetAddress DEFAULT_CLIENT_ADDRESS = InetAddress.getLoopbackAddress(); @@ -417,6 +432,9 @@ public class GroupMetadataManagerTestContext { private final List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<>(); private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; + private final List<StreamsGroupBuilder> streamsGroupBuilders = new ArrayList<>(); + private int streamsGroupMaxSize = Integer.MAX_VALUE; + private int streamsGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; private int classicGroupMaxSize = Integer.MAX_VALUE; private int classicGroupInitialRebalanceDelayMs = 3000; private final int classicGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000; @@ -428,7 +446,7 @@ public class GroupMetadataManagerTestContext { private ShareGroupPartitionAssignor shareGroupAssignor = new MockPartitionAssignor("share"); private final List<ShareGroupBuilder> shareGroupBuilders = new ArrayList<>(); private int shareGroupMaxSize = Integer.MAX_VALUE; - private List<TaskAssignor> taskAssignors = Collections.singletonList(new MockTaskAssignor("mock")); + private List<TaskAssignor> streamsGroupAssignors = Collections.singletonList(new MockTaskAssignor("mock")); public Builder withMetadataImage(MetadataImage metadataImage) { this.metadataImage = metadataImage; @@ -485,6 +503,26 @@ public class GroupMetadataManagerTestContext { return this; } + public Builder withTaskAssignors(List<TaskAssignor> assignors) { + this.streamsGroupAssignors = assignors; + return this; + } + + public Builder withStreamsGroup(StreamsGroupBuilder builder) { + this.streamsGroupBuilders.add(builder); + return this; + } + + public Builder withStreamsGroupMaxSize(int streamsGroupMaxSize) { + this.streamsGroupMaxSize = streamsGroupMaxSize; + return this; + } + + public Builder withStreamsGroupMetadataRefreshIntervalMs(int streamsGroupMetadataRefreshIntervalMs) { + this.streamsGroupMetadataRefreshIntervalMs = streamsGroupMetadataRefreshIntervalMs; + return this; + } + public Builder withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupAssignor) { this.shareGroupAssignor = shareGroupAssignor; return this; @@ -528,7 +566,11 @@ public class GroupMetadataManagerTestContext { .withShareGroupAssignor(shareGroupAssignor) .withShareGroupMaxSize(shareGroupMaxSize) .withGroupConfigManager(groupConfigManager) - .withStreamsGroupAssignors(taskAssignors) + .withStreamsGroupHeartbeatInterval(5000) + .withStreamsGroupSessionTimeout(45000) + .withStreamsGroupMaxSize(streamsGroupMaxSize) + .withStreamsGroupAssignors(streamsGroupAssignors) + .withStreamsGroupMetadataRefreshIntervalMs(streamsGroupMetadataRefreshIntervalMs) .build(), groupConfigManager, classicGroupInitialRebalanceDelayMs, @@ -538,6 +580,8 @@ public class GroupMetadataManagerTestContext { consumerGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay)); shareGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay)); + streamsGroupBuilders.forEach(builder -> builder.build().forEach(context::replay)); + context.commit(); return context; @@ -1354,6 +1398,10 @@ public class GroupMetadataManagerTestContext { return groupMetadataManager.consumerGroupDescribe(groupIds, lastCommittedOffset); } + public List<StreamsGroupDescribeResponseData.DescribedGroup> sendStreamsGroupDescribe(List<String> groupIds) { + return groupMetadataManager.streamsGroupDescribe(groupIds, lastCommittedOffset); + } + public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds) { return groupMetadataManager.describeGroups(groupIds, lastCommittedOffset); } @@ -1653,6 +1701,48 @@ public class GroupMetadataManagerTestContext { ); break; + case StreamsGroupCurrentMemberAssignmentKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupCurrentMemberAssignmentKey) key.message(), + (StreamsGroupCurrentMemberAssignmentValue) messageOrNull(value) + ); + break; + + case StreamsGroupMemberMetadataKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupMemberMetadataKey) key.message(), + (StreamsGroupMemberMetadataValue) messageOrNull(value) + ); + break; + + case StreamsGroupMetadataKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupMetadataKey) key.message(), + (StreamsGroupMetadataValue) messageOrNull(value) + ); + break; + + case StreamsGroupPartitionMetadataKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupPartitionMetadataKey) key.message(), + (StreamsGroupPartitionMetadataValue) messageOrNull(value) + ); + break; + + case StreamsGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupTargetAssignmentMemberKey) key.message(), + (StreamsGroupTargetAssignmentMemberValue) messageOrNull(value) + ); + break; + + case StreamsGroupTargetAssignmentMetadataKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupTargetAssignmentMetadataKey) key.message(), + (StreamsGroupTargetAssignmentMetadataValue) messageOrNull(value) + ); + break; + case StreamsGroupTopologyKey.HIGHEST_SUPPORTED_VERSION: groupMetadataManager.replay( (StreamsGroupTopologyKey) key.message(), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java new file mode 100644 index 00000000000..b07dabc05e1 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.coordinator.group.CoordinatorRecord; + +public class StreamsGroupBuilder { + + private final String groupId; + private final int groupEpoch; + private int assignmentEpoch; + private final Map<String, StreamsGroupMember> members = new HashMap<>(); + private final Map<String, Assignment> assignments = new HashMap<>(); + private Map<String, TopicMetadata> partitionMetadata = new HashMap<>(); + + public StreamsGroupBuilder(String groupId, int groupEpoch) { + this.groupId = groupId; + this.groupEpoch = groupEpoch; + this.assignmentEpoch = 0; + } + + public StreamsGroupBuilder withMember(StreamsGroupMember member) { + this.members.put(member.memberId(), member); + return this; + } + + public StreamsGroupBuilder withPartitionMetadata( + Map<String, TopicMetadata> partitionMetadata) { + this.partitionMetadata = partitionMetadata; + return this; + } + + public StreamsGroupBuilder withAssignment(String memberId, + Map<String, Set<Integer>> assignment) { + this.assignments.put(memberId, new Assignment(assignment)); + return this; + } + + public StreamsGroupBuilder withAssignmentEpoch(int assignmentEpoch) { + this.assignmentEpoch = assignmentEpoch; + return this; + } + + public List<CoordinatorRecord> build() { + List<CoordinatorRecord> records = new ArrayList<>(); + + // Add records for members. + members.forEach((memberId, member) -> + records.add( + CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId, member)) + ); + + if (!partitionMetadata.isEmpty()) { + records.add( + CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, + partitionMetadata)); + } + + // Add group epoch record. + records.add( + CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch)); + + // Add target assignment records. + assignments.forEach((memberId, assignment) -> + records.add( + CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(groupId, memberId, + assignment.activeTasks(), assignment.standbyTasks(), assignment.warmupTasks())) + ); + + // Add target assignment epoch. + records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId, + assignmentEpoch)); + + // Add current assignment records for members. + members.forEach((memberId, member) -> + records.add( + CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(groupId, member)) + ); + + return records; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java index f4e4eccdaa0..35ab964d1b6 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.coordinator.group.streams; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue.TaskIds; import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue; @@ -50,7 +55,6 @@ public class StreamsGroupMemberTest { .setClientId("client-id") .setClientHost("hostname") .setTopologyId("topology-hash") - .setAssignor("assignor") .setProcessId("process-id") .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)) .setClientTags(mkMap(mkEntry("client", "tag"))) @@ -68,7 +72,6 @@ public class StreamsGroupMemberTest { assertEquals("client-id", member.clientId()); assertEquals("hostname", member.clientHost()); assertEquals("topology-hash", member.topologyId()); - assertEquals("assignor", member.assignor().get()); assertEquals("process-id", member.processId()); assertEquals(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090), member.userEndpoint()); assertEquals( @@ -107,7 +110,6 @@ public class StreamsGroupMemberTest { .setClientId("client-id") .setClientHost("hostname") .setTopologyId("topology-hash") - .setAssignor("assignor") .setProcessId("process-id") .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)) .setClientTags(mkMap(mkEntry("client", "tag"))) @@ -126,7 +128,6 @@ public class StreamsGroupMemberTest { .setClientId("client-id") .setClientHost("hostname") .setTopologyId("topology-hash") - .setAssignor("assignor") .setProcessId("process-id") .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)) .setClientTags(mkMap(mkEntry("client", "tag"))) @@ -145,7 +146,6 @@ public class StreamsGroupMemberTest { .setClientId("client-id") .setClientHost("hostname") .setTopologyId("topology-hash") - .setAssignor("assignor") .setProcessId("process-id") .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)) .setClientTags(mkMap(mkEntry("client", "tag"))) @@ -173,7 +173,6 @@ public class StreamsGroupMemberTest { .setClientId("client-id") .setClientHost("hostname") .setTopologyId("topology-hash") - .setAssignor("assignor") .setProcessId("process-id") .setUserEndpoint(new StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)) .setClientTags(mkMap(mkEntry("client", "tag"))) @@ -188,7 +187,6 @@ public class StreamsGroupMemberTest { .maybeUpdateRackId(Optional.empty()) .maybeUpdateInstanceId(Optional.empty()) .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty()) - .maybeUpdateAssignor(Optional.empty()) .build(); assertEquals(member, updatedMember); @@ -197,13 +195,11 @@ public class StreamsGroupMemberTest { .maybeUpdateRackId(Optional.of("new-rack-id")) .maybeUpdateInstanceId(Optional.of("new-instance-id")) .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000)) - .maybeUpdateAssignor(Optional.of("new-assignor")) .build(); assertEquals("new-instance-id", updatedMember.instanceId()); assertEquals("new-rack-id", updatedMember.rackId()); assertEquals(6000, updatedMember.rebalanceTimeoutMs()); - assertEquals("new-assignor", updatedMember.assignor().get()); } @Test @@ -275,4 +271,117 @@ public class StreamsGroupMemberTest { assertEquals(mkAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9)), member.assignedWarmupTasks()); assertEquals(mkAssignment(mkTaskAssignment(subtopologyId2, 2, 3, 1)), member.activeTasksPendingRevocation()); } + + @Test + public void testAsStreamsGroupDescribeMember() { + String subTopology1 = Uuid.randomUuid().toString(); + String subTopology2 = Uuid.randomUuid().toString(); + String subTopology3 = Uuid.randomUuid().toString(); + String subTopology4 = Uuid.randomUuid().toString(); + List<Integer> assignedTasks1 = Arrays.asList(0, 1, 2); + List<Integer> assignedTasks2 = Arrays.asList(3, 4, 5); + List<Integer> assignedTasks3 = Arrays.asList(6, 7, 8); + List<Integer> assignedTasks4 = Arrays.asList(5, 6, 7); + int epoch = 10; + StreamsGroupCurrentMemberAssignmentValue record = new StreamsGroupCurrentMemberAssignmentValue() + .setMemberEpoch(epoch) + .setPreviousMemberEpoch(epoch - 1) + .setActiveTasks(Collections.singletonList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subTopology1) + .setPartitions(assignedTasks1))) + .setStandbyTasks(Collections.singletonList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subTopology2) + .setPartitions(assignedTasks2))) + .setWarmupTasks(Collections.singletonList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subTopology3) + .setPartitions(assignedTasks3))) + .setActiveTasksPendingRevocation(Collections.singletonList(new StreamsGroupCurrentMemberAssignmentValue.TaskIds() + .setSubtopology(subTopology4) + .setPartitions(assignedTasks4))); + String memberId = Uuid.randomUuid().toString(); + String clientId = "clientId"; + String instanceId = "instanceId"; + String rackId = "rackId"; + String clientHost = "clientHost"; + String processId = "processId"; + String topologyId = "topologyId"; + Map<String, String> clientTags = Collections.singletonMap("key", "value"); + org.apache.kafka.coordinator.group.streams.Assignment targetAssignment = new org.apache.kafka.coordinator.group.streams.Assignment( + mkMap(mkEntry(subTopology1, new HashSet<>(assignedTasks3))), + mkMap(mkEntry(subTopology2, new HashSet<>(assignedTasks2))), + mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks1))) + ); + StreamsGroupMember member = new StreamsGroupMember.Builder(memberId) + .updateWith(record) + .setClientId(clientId) + .setInstanceId(instanceId) + .setRackId(rackId) + .setClientHost(clientHost) + .setProcessId(processId) + .setTopologyId(topologyId) + .setClientTags(clientTags) + .setAssignedActiveTasks( + mkMap(mkEntry(subTopology1, new HashSet<>(assignedTasks1))) + ) + .setAssignedStandbyTasks( + mkMap(mkEntry(subTopology2, new HashSet<>(assignedTasks2))) + ) + .setAssignedWarmupTasks( + mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks3))) + ) + .setActiveTasksPendingRevocation( + mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks4))) + ) + .build(); + + StreamsGroupDescribeResponseData.Member actual = member.asStreamsGroupDescribeMember(targetAssignment); + StreamsGroupDescribeResponseData.Member expected = new StreamsGroupDescribeResponseData.Member() + .setMemberId(memberId) + .setMemberEpoch(epoch) + .setClientId(clientId) + .setInstanceId(instanceId) + .setRackId(rackId) + .setClientHost(clientHost) + .setProcessId(processId) + .setTopologyId(topologyId) + .setClientTags(Collections.singletonList(new StreamsGroupDescribeResponseData.KeyValue().setKey("key").setValue("value"))) + .setAssignment( + new StreamsGroupDescribeResponseData.Assignment() + .setActiveTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopology(subTopology1) + .setPartitions(assignedTasks1))) + .setStandbyTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopology(subTopology2) + .setPartitions(assignedTasks2))) + .setWarmupTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopology(subTopology3) + .setPartitions(assignedTasks3))) + ) + .setTargetAssignment( + new StreamsGroupDescribeResponseData.Assignment() + .setActiveTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopology(subTopology1) + .setPartitions(assignedTasks3))) + .setStandbyTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopology(subTopology2) + .setPartitions(assignedTasks2))) + .setWarmupTasks(Collections.singletonList(new StreamsGroupDescribeResponseData.TaskIds() + .setSubtopology(subTopology3) + .setPartitions(assignedTasks1))) + ); + // TODO: Add TaskOffsets + + assertEquals(expected, actual); + } + + @Test + public void testAsStreamsGroupDescribeWithTargetAssignmentNull() { + StreamsGroupMember member = new StreamsGroupMember.Builder(Uuid.randomUuid().toString()) + .build(); + + StreamsGroupDescribeResponseData.Member streamsGroupDescribeMember = member.asStreamsGroupDescribeMember( + null); + + assertEquals(new StreamsGroupDescribeResponseData.Assignment(), streamsGroupDescribeMember.targetAssignment()); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index c142c9b9cd7..3da224f3a95 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -88,9 +88,7 @@ public class StreamsGroupTest { member = streamsGroup.getOrMaybeCreateMember("member", true); - member = new StreamsGroupMember.Builder(member) - .setAssignor("client") - .build(); + member = new StreamsGroupMember.Builder(member).build(); streamsGroup.updateMember(member); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java new file mode 100644 index 00000000000..7ef086c9be5 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; +import org.junit.jupiter.api.Test; + +public class StreamsTopologyTest { + + @Test + public void streamsTopologyIdShouldBeCorrect() { + StreamsTopology topology = new StreamsTopology("topology-id", Collections.emptyMap()); + assertEquals("topology-id", topology.topologyId()); + } + + @Test + public void subtopologiesShouldBeCorrect() { + Map<String, Subtopology> subtopologies = mkMap( + mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + ); + StreamsTopology topology = new StreamsTopology("topology-id", subtopologies); + assertEquals(subtopologies, topology.subtopologies()); + } + + @Test + public void topicSubscriptionShouldBeCorrect() { + Map<String, Subtopology> subtopologies = mkMap( + mkEntry("subtopology-1", new Subtopology() + .setSourceTopics(Arrays.asList("source-topic-1", "source-topic-2")) + .setRepartitionSourceTopics(Arrays.asList( + new TopicInfo().setName("repartition-topic-1"), + new TopicInfo().setName("repartition-topic-2") + )) + ), + mkEntry("subtopology-2", new Subtopology() + .setSourceTopics(Arrays.asList("source-topic-3", "source-topic-4")) + .setRepartitionSourceTopics(Arrays.asList( + new TopicInfo().setName("repartition-topic-3"), + new TopicInfo().setName("repartition-topic-4") + )) + ) + ); + StreamsTopology topology = new StreamsTopology("topology-id", subtopologies); + Set<String> expectedTopics = new HashSet<>(Arrays.asList( + "source-topic-1", "source-topic-2", "repartition-topic-1", "repartition-topic-2", + "source-topic-3", "source-topic-4", "repartition-topic-3", "repartition-topic-4" + )); + assertEquals(expectedTopics, topology.topicSubscription()); + } + + @Test + public void fromRecordShouldCreateCorrectTopology() { + StreamsGroupTopologyValue record = new StreamsGroupTopologyValue() + .setTopologyId("topology-id") + .setTopology(Arrays.asList( + new Subtopology().setSubtopology("subtopology-1"), + new Subtopology().setSubtopology("subtopology-2") + )); + StreamsTopology topology = StreamsTopology.fromRecord(record); + assertEquals("topology-id", topology.topologyId()); + assertEquals(2, topology.subtopologies().size()); + assertTrue(topology.subtopologies().containsKey("subtopology-1")); + assertTrue(topology.subtopologies().containsKey("subtopology-2")); + } + + @Test + public void equalsShouldReturnTrueForEqualTopologies() { + Map<String, Subtopology> subtopologies = mkMap( + mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + ); + StreamsTopology topology1 = new StreamsTopology("topology-id", subtopologies); + StreamsTopology topology2 = new StreamsTopology("topology-id", subtopologies); + assertEquals(topology1, topology2); + } + + @Test + public void equalsShouldReturnFalseForDifferentTopologies() { + Map<String, Subtopology> subtopologies1 = mkMap( + mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")) + ); + Map<String, Subtopology> subtopologies2 = mkMap( + mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + ); + StreamsTopology topology1 = new StreamsTopology("topology-id-1", subtopologies1); + StreamsTopology topology2 = new StreamsTopology("topology-id-2", subtopologies2); + assertNotEquals(topology1, topology2); + } + + @Test + public void hashCodeShouldBeConsistentWithEquals() { + Map<String, Subtopology> subtopologies = mkMap( + mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + ); + StreamsTopology topology1 = new StreamsTopology("topology-id", subtopologies); + StreamsTopology topology2 = new StreamsTopology("topology-id", subtopologies); + assertEquals(topology1.hashCode(), topology2.hashCode()); + } + + @Test + public void toStringShouldReturnCorrectRepresentation() { + Map<String, Subtopology> subtopologies = mkMap( + mkEntry("subtopology-1", new Subtopology().setSubtopology("subtopology-1")), + mkEntry("subtopology-2", new Subtopology().setSubtopology("subtopology-2")) + ); + StreamsTopology topology = new StreamsTopology("topology-id", subtopologies); + String expectedString = "StreamsTopology{topologyId=topology-id, subtopologies=" + subtopologies + "}"; + assertEquals(expectedString, topology.toString()); + } + + @Test + public void asStreamsGroupDescribeTopologyShouldReturnCorrectSubtopologies() { + Map<String, Subtopology> subtopologies = mkMap( + mkEntry("subtopology-1", new Subtopology() + .setSourceTopicRegex("regex-1") + .setSubtopology("subtopology-1") + .setSourceTopics(Collections.singletonList("source-topic-1")) + .setRepartitionSinkTopics(Collections.singletonList("sink-topic-1")) + .setRepartitionSourceTopics( + Collections.singletonList(new TopicInfo().setName("repartition-topic-1"))) + .setStateChangelogTopics( + Collections.singletonList(new TopicInfo().setName("changelog-topic-1"))) + ), + mkEntry("subtopology-2", new Subtopology() + .setSourceTopicRegex("regex-2") + .setSubtopology("subtopology-2") + .setSourceTopics(Collections.singletonList("source-topic-2")) + .setRepartitionSinkTopics(Collections.singletonList("sink-topic-2")) + .setRepartitionSourceTopics( + Collections.singletonList(new TopicInfo().setName("repartition-topic-2"))) + .setStateChangelogTopics( + Collections.singletonList(new TopicInfo().setName("changelog-topic-2"))) + ) + ); + StreamsTopology topology = new StreamsTopology("topology-id", subtopologies); + List<StreamsGroupDescribeResponseData.Subtopology> result = topology.asStreamsGroupDescribeTopology(); + assertEquals(2, result.size()); + assertEquals("regex-1", result.get(0).sourceTopicRegex()); + assertEquals("subtopology-1", result.get(0).subtopology()); + assertEquals(Collections.singletonList("source-topic-1"), result.get(0).sourceTopics()); + assertEquals(Collections.singletonList("sink-topic-1"), result.get(0).repartitionSinkTopics()); + assertEquals("repartition-topic-1", result.get(0).repartitionSourceTopics().get(0).name()); + assertEquals("changelog-topic-1", result.get(0).stateChangelogTopics().get(0).name()); + assertEquals("regex-2", result.get(1).sourceTopicRegex()); + assertEquals("subtopology-2", result.get(1).subtopology()); + assertEquals(Collections.singletonList("source-topic-2"), result.get(1).sourceTopics()); + assertEquals(Collections.singletonList("sink-topic-2"), result.get(1).repartitionSinkTopics()); + assertEquals("repartition-topic-2", result.get(1).repartitionSourceTopics().get(0).name()); + assertEquals("changelog-topic-2", result.get(1).stateChangelogTopics().get(0).name()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java index b16664b8f30..d97832eb969 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -225,8 +225,7 @@ public class TargetAssignmentBuilderTest { .withTopology(topology) .withStaticMembers(staticMembers) .withSubscriptionMetadata(subscriptionMetadata) - .withTargetAssignment(targetAssignment) - .withTopicsImage(topicsImage); + .withTargetAssignment(targetAssignment); // Add the updated members or delete the deleted members. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { @@ -262,12 +261,10 @@ public class TargetAssignmentBuilderTest { .topics(); final Map<String, String> clientTags = mkMap(mkEntry("tag1", "value1"), mkEntry("tag2", "value2")); - final Map<String, String> assignmentConfigs = mkMap(mkEntry("conf1", "value1"), mkEntry("conf2", "value2")); StreamsGroupMember member = new StreamsGroupMember.Builder("member-id") .setRackId("rackId") .setInstanceId("instanceId") .setProcessId("processId") - .setAssignor("assignor") .setClientTags(clientTags) .build();
