This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit fd4600b9b621603a0f443f7a284bfbd8fbb35640 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon Jun 3 17:23:43 2024 +0200 Implement InitStreamsApp RPC in the group coordinator Implement init streams app call in group coordinator without creating internal topics. Create topology metadata record Verify existence of all required internal topics See https://github.com/lucasbru/kafka/pull/15 --- .../group/GroupCoordinatorAdapter.scala | 11 +- core/src/main/scala/kafka/server/KafkaApis.scala | 28 +++++ .../group/GroupCoordinatorAdapterTest.scala | 18 ++- .../scala/unit/kafka/server/KafkaApisTest.scala | 62 +++++++++ .../kafka/coordinator/group/GroupCoordinator.java | 16 +++ .../group/GroupCoordinatorRecordHelpers.java | 42 +++++++ .../coordinator/group/GroupCoordinatorService.java | 31 +++++ .../coordinator/group/GroupCoordinatorShard.java | 18 +++ .../coordinator/group/GroupMetadataManager.java | 116 +++++++++++++++++ .../group/GroupCoordinatorRecordHelpersTest.java | 82 ++++++++++++ .../group/GroupCoordinatorServiceTest.java | 115 +++++++++++++++++ .../group/GroupCoordinatorShardTest.java | 34 +++++ .../group/GroupMetadataManagerTest.java | 140 +++++++++++++++++++++ .../group/GroupMetadataManagerTestContext.java | 42 +++++++ 14 files changed, 753 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 01756550569..47955e727a0 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata import kafka.server.{KafkaConfig, ReplicaManager} import kafka.utils.Implicits.MapExtensionMethods import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] +import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch @@ -78,6 +78,15 @@ private[group] class GroupCoordinatorAdapter( )) } + override def streamsInitialize( + context: RequestContext, + request: StreamsInitializeRequestData + ): CompletableFuture[StreamsInitializeResponseData] = { + FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( + s"The old group coordinator does not support ${ApiKeys.STREAMS_INITIALIZE.name} API." + )) + } + override def shareGroupHeartbeat( context: RequestContext, request: ShareGroupHeartbeatRequestData diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2acface315d..acad87c7c66 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -275,6 +275,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request) case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) + case ApiKeys.STREAMS_INITIALIZE => handleStreamsInitialize(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3865,6 +3866,33 @@ class KafkaApis(val requestChannel: RequestChannel, } + def handleStreamsInitialize(request: RequestChannel.Request): CompletableFuture[Unit] = { + val streamsInitializeRequest = request.body[StreamsInitializeRequest] + + // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, streamsInitializeRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + groupCoordinator.streamsInitialize( + request.context, + streamsInitializeRequest.data, + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, new StreamsInitializeResponse(response)) + } + } + } + } + def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index d88ee68abcf..0b4db0f3245 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException} -import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...] +import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...] import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} @@ -78,6 +78,22 @@ class GroupCoordinatorAdapterTest { assertFutureThrows(future, classOf[UnsupportedVersionException]) } + @Test + def testStreamsInitialize(): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) + + val ctx = makeContext(ApiKeys.STREAMS_INITIALIZE, ApiKeys.STREAMS_INITIALIZE.latestVersion) + val request = new StreamsInitializeRequestData() + .setGroupId("group") + + val future = adapter.streamsInitialize(ctx, request) + + assertTrue(future.isDone) + assertTrue(future.isCompletedExceptionally) + assertFutureThrows(future, classOf[UnsupportedVersionException]) + } + @Test def testJoinShareGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index b978389743b..4b5d6ae84d5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11175,6 +11175,68 @@ class KafkaApisTest extends Logging { val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + + def testStreamsInitializeRequest(): Unit = { + val streamsInitializeRequest = new StreamsInitializeRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build()) + + val future = new CompletableFuture[StreamsInitializeResponseData]() + when(groupCoordinator.streamsInitialize( + requestChannelRequest.context, + streamsInitializeRequest + )).thenReturn(future) + kafkaApis = createKafkaApis(overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" + )) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val streamsInitializeResponse = new StreamsInitializeResponseData() + + future.complete(streamsInitializeResponse) + val response = verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest) + assertEquals(streamsInitializeResponse, response.data) + } + + @Test + def testStreamsInitializeRequestFutureFailed(): Unit = { + val streamsInitializeRequest = new StreamsInitializeRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build()) + + val future = new CompletableFuture[StreamsInitializeResponseData]() + when(groupCoordinator.streamsInitialize( + requestChannelRequest.context, + streamsInitializeRequest + )).thenReturn(future) + kafkaApis = createKafkaApis(overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" + )) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) + val response = verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest) + assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode) + } + + @Test + def testStreamsInitializeRequestAuthorizationFailed(): Unit = { + val streamsInitializeRequest = new StreamsInitializeRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest) + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) + } @ParameterizedTest @ValueSource(booleans = Array(true, false)) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 87efb530dc8..f4db71e65f6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -82,6 +84,20 @@ public interface GroupCoordinator { ConsumerGroupHeartbeatRequestData request ); + /** + * Initialize a Streams Group. + * + * @param context The request context. + * @param request The StreamsHeartbeatRequest data. + * + * @return A future yielding the response. + * The error code(s) of the response are set to indicate the error(s) occurred during the execution. + */ + CompletableFuture<StreamsInitializeResponseData> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ); + /** * Heartbeat to a Share Group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 67364470b0f..4424c161a3b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; @@ -49,6 +50,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; @@ -60,11 +63,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * This class contains helper methods to create records stored in * the __consumer_offsets topic. */ +@SuppressWarnings("ClassDataAbstractionCoupling") public class GroupCoordinatorRecordHelpers { private GroupCoordinatorRecordHelpers() {} @@ -908,4 +913,41 @@ public class GroupCoordinatorRecordHelpers { ); return topics; } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param subtopologies The subtopologies in the new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + List<StreamsInitializeRequestData.Subtopology> subtopologies) { + StreamsGroupTopologyValue value = new StreamsGroupTopologyValue(); + subtopologies.forEach(subtopology -> { + List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = subtopology.repartitionSourceTopics().stream() + .map(topicInfo -> { + List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs().stream() + .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) + .collect(Collectors.toList()); + return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs) + .setPartitions(topicInfo.partitions()); + }).collect(Collectors.toList()); + + List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = subtopology.stateChangelogTopics().stream().map(topicInfo -> { + List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs().stream() + .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) + .collect(Collectors.toList()); + return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs); + }).collect(Collectors.toList()); + + value.topology().add(new StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopology()) + .setSourceTopics(subtopology.sourceTopics()).setSinkTopics(subtopology.sinkTopics()) + .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); + }); + + return new CoordinatorRecord(new ApiMessageAndVersion(new StreamsGroupTopologyKey().setGroupId(groupId), (short) 15), + new ApiMessageAndVersion(value, (short) 0)); + } + } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 9ac43783be6..28a2b73ef1a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -44,6 +44,8 @@ import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -342,6 +344,35 @@ public class GroupCoordinatorService implements GroupCoordinator { )); } + /** + * See {@link GroupCoordinator#streamsInitialize(RequestContext, StreamsInitializeRequestData)}. + */ + @Override + public CompletableFuture<StreamsInitializeResponseData> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(new StreamsInitializeResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + return runtime.scheduleWriteOperation( + "streams-group-initialize", + topicPartitionFor(request.groupId()), + Duration.ofMillis(config.offsetCommitTimeoutMs), + coordinator -> coordinator.streamsInitialize(context, request) + ).exceptionally(exception -> handleOperationException( + "streams-group-initialize", + request, + exception, + (error, message) -> new StreamsInitializeResponseData() + .setErrorCode(error.code()) + .setErrorMessage(message) + )); + } + /** * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index c6eced2158e..98965c28c1b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -333,6 +335,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord return groupMetadataManager.consumerGroupHeartbeat(context, request); } + /** + * Handles a StreamsInitialize request. + * + * @param context The request context. + * @param request The actual StreamsInitialize request. + * + * @return A Result containing the StreamsInitialize response and + * a list of records to update the state machine. + */ + public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ) { + return groupMetadataManager.streamsInitialize(context, request); + } + /** * Handles a ShareGroupHeartbeat request. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index ecf7efaad49..812b87aaaac 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -52,6 +52,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.protocol.Errors; @@ -99,6 +101,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; @@ -140,6 +144,7 @@ import java.util.stream.Stream; import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION; import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; +import static org.apache.kafka.common.protocol.Errors.STREAMS_INVALID_TOPOLOGY; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; @@ -161,6 +166,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.Utils.assignmentToString; import static org.apache.kafka.coordinator.group.Utils.ofSentinel; import static org.apache.kafka.coordinator.group.Utils.toConsumerProtocolAssignment; @@ -1332,7 +1338,29 @@ public class GroupMetadataManager { throw new InvalidRequestException("MemberEpoch is invalid."); } } + + /** + * Validates the request. + * + * @param request The request to validate. + * + * @throws InvalidRequestException if the request is not valid. + * @throws UnsupportedAssignorException if the assignor is not supported. + */ + private void throwIfStreamsInitializeRequestIsInvalid( + StreamsInitializeRequestData request + ) throws InvalidRequestException, UnsupportedAssignorException { + throwIfEmptyString(request.groupId(), "GroupId can't be empty."); + + if (request.topology().isEmpty()) { + throw new InvalidRequestException("There must be at least one subtopology."); + } + + // TODO: Check invariants + } + + /** /** * Verifies that the partitions currently owned by the member (the ones set in the * request) matches the ones that the member should own. It matches if the consumer @@ -1864,6 +1892,54 @@ public class GroupMetadataManager { return new CoordinatorResult<>(records, response); } + + /** + * Handles the initialization of the topology information on the broker side, that will be reused by all members of the group. + * + * @param groupId The group id from the request. + * @param subtopologies The list of subtopologies + * @return A Result containing the StreamsInitialize response and a list of records to update the state machine. + */ + private CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize(String groupId, + List<StreamsInitializeRequestData.Subtopology> subtopologies) + throws ApiException { + final List<CoordinatorRecord> records = new ArrayList<>(); + + // TODO: Throw if group does not exist or is not a streams group. Needs model of + // similar to final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId, createIfNotExists, records); + // throwIfNull(group); + + // TODO: For the POC, only check if internal topics exist + Set<String> missingTopics = new HashSet<>(); + for (StreamsInitializeRequestData.Subtopology subtopology : subtopologies) { + for (StreamsInitializeRequestData.TopicInfo topic : subtopology.stateChangelogTopics()) { + if (metadataImage.topics().getTopic(topic.name()) == null) { + missingTopics.add(topic.name()); + } + } + for (StreamsInitializeRequestData.TopicInfo topic : subtopology.repartitionSourceTopics()) { + if (metadataImage.topics().getTopic(topic.name()) == null) { + missingTopics.add(topic.name()); + } + } + } + if (!missingTopics.isEmpty()) { + StreamsInitializeResponseData response = + new StreamsInitializeResponseData() + .setErrorCode(STREAMS_INVALID_TOPOLOGY.code()) + .setErrorMessage("Internal topics " + String.join(", ", missingTopics) + " do not exist."); + + return new CoordinatorResult<>(records, response); + } else { + records.add(newStreamsGroupTopologyRecord(groupId, subtopologies)); + + StreamsInitializeResponseData response = new StreamsInitializeResponseData(); + + return new CoordinatorResult<>(records, response); + } + + } + /** * Handle a JoinGroupRequest to a ConsumerGroup. * @@ -3171,6 +3247,46 @@ public class GroupMetadataManager { } } + /** + * Handles a ConsumerGroupHeartbeat request. + * + * @param context The request context. + * @param request The actual ConsumerGroupHeartbeat request. + * + * @return A Result containing the ConsumerGroupHeartbeat response and + * a list of records to update the state machine. + */ + public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ) throws ApiException { + throwIfStreamsInitializeRequestIsInvalid(request); + + return streamsInitialize( + request.groupId(), + request.topology() + ); + } + + /** + * Replays StreamsGroupTopologyKey/Value to update the hard state of + * the streams group. + * + * @param key A StreamsGroupTopologyKey key. + * @param value A StreamsGroupTopologyValue record. + */ + public void replay( + StreamsGroupTopologyKey key, + StreamsGroupTopologyValue value + ) { + + // TODO: Insert the topology information to the in-memory representation. Needs the notion + // of a Streams group +// String groupId = key.groupId(); +// StreamsGroup streamsGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); +// streamsGroup.setTopology(value); + } + /** * Handles a ShareGroupHeartbeat request. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index 38e2b9d68cf..7152dea8ffe 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; +import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -43,6 +44,8 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.TopicMetadata; @@ -87,6 +90,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; @@ -213,6 +217,84 @@ public class GroupCoordinatorRecordHelpersTest { )); } + @Test + public void testNewStreamsGroupTopologyRecord() { + List<StreamsInitializeRequestData.Subtopology> topology = + Collections.singletonList(new StreamsInitializeRequestData.Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new StreamsInitializeRequestData.TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new StreamsInitializeRequestData.TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new StreamsInitializeRequestData.TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new StreamsInitializeRequestData.TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + + List<StreamsGroupTopologyValue.Subtopology> expectedTopology = + Collections.singletonList(new StreamsGroupTopologyValue.Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new StreamsGroupTopologyValue.TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new StreamsGroupTopologyValue.TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + + CoordinatorRecord expectedRecord = new CoordinatorRecord( + new ApiMessageAndVersion( + new StreamsGroupTopologyKey() + .setGroupId("group-id"), + (short) 15), + new ApiMessageAndVersion( + new StreamsGroupTopologyValue() + .setTopology(expectedTopology), + (short) 0)); + + assertEquals(expectedRecord, newStreamsGroupTopologyRecord( + "group-id", + topology + )); + } + @Test public void testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() { Uuid fooTopicId = Uuid.randomUuid(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 1056d80dd38..c3d13d28857 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; @@ -52,6 +53,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -254,6 +257,118 @@ public class GroupCoordinatorServiceTest { ); } + @Test + public void testStreamsInitializeWhenNotStarted() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsInitializeRequestData request = new StreamsInitializeRequestData() + .setGroupId("foo"); + + CompletableFuture<StreamsInitializeResponseData> future = service.streamsInitialize( + requestContext(ApiKeys.STREAMS_INITIALIZE), + request + ); + + assertEquals( + new StreamsInitializeResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), + future.get() + ); + } + + @Test + public void testStreamsInitialize() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsInitializeRequestData request = new StreamsInitializeRequestData() + .setGroupId("foo"); + + service.startup(() -> 1); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("streams-group-initialize"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsInitializeResponseData() + )); + + CompletableFuture<StreamsInitializeResponseData> future = service.streamsInitialize( + requestContext(ApiKeys.STREAMS_INITIALIZE), + request + ); + + assertEquals(new StreamsInitializeResponseData(), future.get(5, TimeUnit.SECONDS)); + } + + private static Stream<Arguments> testStreamsInitializeWithExceptionSource() { + return Stream.of( + Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null), + Arguments.arguments(new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null), + Arguments.arguments(new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid"), + Arguments.arguments(new StreamsInvalidTopologyException("Invalid"), Errors.STREAMS_INVALID_TOPOLOGY.code(), "Invalid") + ); + } + + @ParameterizedTest + @MethodSource("testStreamsInitializeWithExceptionSource") + public void testStreamsInitializeWithException( + Throwable exception, + short expectedErrorCode, + String expectedErrorMessage + ) throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsInitializeRequestData request = new StreamsInitializeRequestData() + .setGroupId("foo"); + + service.startup(() -> 1); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("streams-group-initialize"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(exception)); + + CompletableFuture<StreamsInitializeResponseData> future = service.streamsInitialize( + requestContext(ApiKeys.STREAMS_INITIALIZE), + request + ); + + assertEquals( + new StreamsInitializeResponseData() + .setErrorCode(expectedErrorCode) + .setErrorMessage(expectedErrorMessage), + future.get(5, TimeUnit.SECONDS) + ); + } + @Test public void testPartitionFor() { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 9cf3de6f821..65fb545f641 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -126,6 +128,38 @@ public class GroupCoordinatorShardTest { assertEquals(result, coordinator.consumerGroupHeartbeat(context, request)); } + @Test + public void testStreamsInitialize() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + RequestContext context = requestContext(ApiKeys.STREAMS_INITIALIZE); + StreamsInitializeRequestData request = new StreamsInitializeRequestData(); + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = new CoordinatorResult<>( + Collections.emptyList(), + new StreamsInitializeResponseData() + ); + + when(groupMetadataManager.streamsInitialize( + context, + request + )).thenReturn(result); + + assertEquals(result, coordinator.streamsInitialize(context, request)); + } + @Test public void testCommitOffset() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 653662b859b..3a574798241 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -53,6 +53,11 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeRequestData.Subtopology; +import org.apache.kafka.common.message.StreamsInitializeRequestData.TopicConfig; +import org.apache.kafka.common.message.StreamsInitializeRequestData.TopicInfo; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -14745,4 +14750,139 @@ public class GroupMetadataManagerTest { assertEquals(expectedSuccessCount, successCount); return memberIds; } + + @Test + public void testTopologyInitialization() { + String groupId = "fooup"; + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "repartition"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "changelog"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .build(); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.consumerGroup(groupId)); + + final List<Subtopology> topology = Collections.singletonList( + new Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = + context.streamsInitialize( + new StreamsInitializeRequestData() + .setGroupId(groupId) + .setTopology(topology) + ); + + assertEquals( + new StreamsInitializeResponseData(), + result.response() + ); + + List<CoordinatorRecord> expectedRecords = Arrays.asList( + CoordinatorRecordHelpers.newStreamsGroupTopologyRecord( + groupId, + topology + ) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testTopologyInitializationMissingInternalTopics() { + String groupId = "fooup"; + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "repartition"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.consumerGroup(groupId)); + + final List<Subtopology> topology = Collections.singletonList( + new Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = + context.streamsInitialize( + new StreamsInitializeRequestData() + .setGroupId(groupId) + .setTopology(topology) + ); + + assertEquals( + new StreamsInitializeResponseData() + .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code()) + .setErrorMessage("Internal topics changelog do not exist."), + result.response() + ); + + assertTrue(result.records().isEmpty()); + + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index b9ba9410497..a2dde4cc33c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.network.ClientInformation; @@ -81,6 +83,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; @@ -657,6 +661,37 @@ public class GroupMetadataManagerTestContext { return result; } + + public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize( + StreamsInitializeRequestData request + ) { + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.STREAMS_INITIALIZE, + ApiKeys.STREAMS_INITIALIZE.latestVersion(), + "client", + 0 + ), + "1", + InetAddress.getLoopbackAddress(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = groupMetadataManager.streamsInitialize( + context, + request + ); + + if (result.replayRecords()) { + result.records().forEach(this::replay); + } + return result; + } + public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) { time.sleep(ms); List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll(); @@ -1550,6 +1585,13 @@ public class GroupMetadataManagerTestContext { ); break; + case StreamsGroupTopologyKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupTopologyKey) key.message(), + (StreamsGroupTopologyValue) messageOrNull(value) + ); + break; + default: throw new IllegalStateException("Received an unknown record type " + key.version() + " in " + record);
