This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit c1a597b211e1583ec9a12020b3c45e4d7030cc36 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon Jun 3 17:23:43 2024 +0200 Resolve merge conflict from 11/25 trunk rebase Implement InitStreamsApp RPC in the group coordinator Implement init streams app call in group coordinator without creating internal topics. Create topology metadata record Verify existence of all required internal topics See https://github.com/lucasbru/kafka/pull/15 --- .../group/GroupCoordinatorAdapter.scala | 11 +- core/src/main/scala/kafka/server/KafkaApis.scala | 28 +++++ .../group/GroupCoordinatorAdapterTest.scala | 18 ++- .../scala/unit/kafka/server/KafkaApisTest.scala | 62 +++++++++ .../kafka/coordinator/group/GroupCoordinator.java | 16 +++ .../group/GroupCoordinatorRecordHelpers.java | 42 +++++++ .../coordinator/group/GroupCoordinatorService.java | 31 +++++ .../coordinator/group/GroupCoordinatorShard.java | 18 +++ .../coordinator/group/GroupMetadataManager.java | 116 +++++++++++++++++ .../group/GroupCoordinatorRecordHelpersTest.java | 82 ++++++++++++ .../group/GroupCoordinatorServiceTest.java | 115 +++++++++++++++++ .../group/GroupCoordinatorShardTest.java | 34 +++++ .../group/GroupMetadataManagerTest.java | 140 +++++++++++++++++++++ .../group/GroupMetadataManagerTestContext.java | 42 +++++++ 14 files changed, 753 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 970d283953e..d45221d1e71 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -18,7 +18,7 @@ package kafka.coordinator.group import kafka.server.{KafkaConfig, ReplicaManager} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] +import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch @@ -77,6 +77,15 @@ private[group] class GroupCoordinatorAdapter( )) } + override def streamsInitialize( + context: RequestContext, + request: StreamsInitializeRequestData + ): CompletableFuture[StreamsInitializeResponseData] = { + FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( + s"The old group coordinator does not support ${ApiKeys.STREAMS_INITIALIZE.name} API." + )) + } + override def shareGroupHeartbeat( context: RequestContext, request: ShareGroupHeartbeatRequestData diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 21942229d87..046f1570707 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -276,6 +276,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request) case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) + case ApiKeys.STREAMS_INITIALIZE => handleStreamsInitialize(request).exceptionally(handleError) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3883,6 +3884,33 @@ class KafkaApis(val requestChannel: RequestChannel, } + def handleStreamsInitialize(request: RequestChannel.Request): CompletableFuture[Unit] = { + val streamsInitializeRequest = request.body[StreamsInitializeRequest] + + // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, streamsInitializeRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + groupCoordinator.streamsInitialize( + request.context, + streamsInitializeRequest.data, + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, streamsInitializeRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle(request, new StreamsInitializeResponse(response)) + } + } + } + } + def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): Unit = { val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest] diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 7a9de453740..0d57eea15f9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -19,7 +19,7 @@ package kafka.coordinator.group import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException} -import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...] +import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestDa [...] import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection} @@ -78,6 +78,22 @@ class GroupCoordinatorAdapterTest { assertFutureThrows(future, classOf[UnsupportedVersionException]) } + @Test + def testStreamsInitialize(): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) + + val ctx = makeContext(ApiKeys.STREAMS_INITIALIZE, ApiKeys.STREAMS_INITIALIZE.latestVersion) + val request = new StreamsInitializeRequestData() + .setGroupId("group") + + val future = adapter.streamsInitialize(ctx, request) + + assertTrue(future.isDone) + assertTrue(future.isCompletedExceptionally) + assertFutureThrows(future, classOf[UnsupportedVersionException]) + } + @Test def testJoinShareGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5771a17d5ea..4c0f3421e3f 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11233,6 +11233,68 @@ class KafkaApisTest extends Logging { val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + + def testStreamsInitializeRequest(): Unit = { + val streamsInitializeRequest = new StreamsInitializeRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build()) + + val future = new CompletableFuture[StreamsInitializeResponseData]() + when(groupCoordinator.streamsInitialize( + requestChannelRequest.context, + streamsInitializeRequest + )).thenReturn(future) + kafkaApis = createKafkaApis(overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" + )) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val streamsInitializeResponse = new StreamsInitializeResponseData() + + future.complete(streamsInitializeResponse) + val response = verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest) + assertEquals(streamsInitializeResponse, response.data) + } + + @Test + def testStreamsInitializeRequestFutureFailed(): Unit = { + val streamsInitializeRequest = new StreamsInitializeRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build()) + + val future = new CompletableFuture[StreamsInitializeResponseData]() + when(groupCoordinator.streamsInitialize( + requestChannelRequest.context, + streamsInitializeRequest + )).thenReturn(future) + kafkaApis = createKafkaApis(overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true" + )) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception) + val response = verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest) + assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.errorCode) + } + + @Test + def testStreamsInitializeRequestAuthorizationFailed(): Unit = { + val streamsInitializeRequest = new StreamsInitializeRequestData().setGroupId("group") + + val requestChannelRequest = buildRequest(new StreamsInitializeRequest.Builder(streamsInitializeRequest, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + overrideProperties = Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true") + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[StreamsInitializeResponse](requestChannelRequest) + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) + } @ParameterizedTest @ValueSource(booleans = Array(true, false)) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 223cff9720e..8a5032451b1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -82,6 +84,20 @@ public interface GroupCoordinator { ConsumerGroupHeartbeatRequestData request ); + /** + * Initialize a Streams Group. + * + * @param context The request context. + * @param request The StreamsHeartbeatRequest data. + * + * @return A future yielding the response. + * The error code(s) of the response are set to indicate the error(s) occurred during the execution. + */ + CompletableFuture<StreamsInitializeResponseData> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ); + /** * Heartbeat to a Share Group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 05cf4d5bd3a..98cf7f937e8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; @@ -51,6 +52,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; @@ -63,11 +66,13 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * This class contains helper methods to create records stored in * the __consumer_offsets topic. */ +@SuppressWarnings("ClassDataAbstractionCoupling") public class GroupCoordinatorRecordHelpers { private GroupCoordinatorRecordHelpers() {} @@ -937,4 +942,41 @@ public class GroupCoordinatorRecordHelpers { ); return topics; } + + /** + * Creates a StreamsTopology record. + * + * @param groupId The consumer group id. + * @param subtopologies The subtopologies in the new topology. + * @return The record. + */ + public static CoordinatorRecord newStreamsGroupTopologyRecord(String groupId, + List<StreamsInitializeRequestData.Subtopology> subtopologies) { + StreamsGroupTopologyValue value = new StreamsGroupTopologyValue(); + subtopologies.forEach(subtopology -> { + List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics = subtopology.repartitionSourceTopics().stream() + .map(topicInfo -> { + List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs().stream() + .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) + .collect(Collectors.toList()); + return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs) + .setPartitions(topicInfo.partitions()); + }).collect(Collectors.toList()); + + List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = subtopology.stateChangelogTopics().stream().map(topicInfo -> { + List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = topicInfo.topicConfigs().stream() + .map(config -> new StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value())) + .collect(Collectors.toList()); + return new StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs); + }).collect(Collectors.toList()); + + value.topology().add(new StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopology()) + .setSourceTopics(subtopology.sourceTopics()).setSinkTopics(subtopology.sinkTopics()) + .setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics)); + }); + + return new CoordinatorRecord(new ApiMessageAndVersion(new StreamsGroupTopologyKey().setGroupId(groupId), (short) 15), + new ApiMessageAndVersion(value, (short) 0)); + } + } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 96d0253a09e..2a3eef88a04 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -44,6 +44,8 @@ import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -344,6 +346,35 @@ public class GroupCoordinatorService implements GroupCoordinator { )); } + /** + * See {@link GroupCoordinator#streamsInitialize(RequestContext, StreamsInitializeRequestData)}. + */ + @Override + public CompletableFuture<StreamsInitializeResponseData> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(new StreamsInitializeResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + return runtime.scheduleWriteOperation( + "streams-group-initialize", + topicPartitionFor(request.groupId()), + Duration.ofMillis(config.offsetCommitTimeoutMs), + coordinator -> coordinator.streamsInitialize(context, request) + ).exceptionally(exception -> handleOperationException( + "streams-group-initialize", + request, + exception, + (error, message) -> new StreamsInitializeResponseData() + .setErrorCode(error.code()) + .setErrorMessage(message) + )); + } + /** * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 099755710b9..b002f96b60b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -361,6 +363,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord return groupMetadataManager.consumerGroupHeartbeat(context, request); } + /** + * Handles a StreamsInitialize request. + * + * @param context The request context. + * @param request The actual StreamsInitialize request. + * + * @return A Result containing the StreamsInitialize response and + * a list of records to update the state machine. + */ + public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ) { + return groupMetadataManager.streamsInitialize(context, request); + } + /** * Handles a ShareGroupHeartbeat request. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 5a7e98ea3cc..c7cf02f7974 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -53,6 +53,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.protocol.Errors; @@ -103,6 +105,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; @@ -150,6 +154,7 @@ import java.util.stream.Stream; import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION; import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; +import static org.apache.kafka.common.protocol.Errors.STREAMS_INVALID_TOPOLOGY; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; @@ -172,6 +177,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord; import static org.apache.kafka.coordinator.group.Utils.assignmentToString; import static org.apache.kafka.coordinator.group.Utils.ofSentinel; import static org.apache.kafka.coordinator.group.Utils.toConsumerProtocolAssignment; @@ -1398,7 +1404,29 @@ public class GroupMetadataManager { throw new InvalidRequestException("MemberEpoch is invalid."); } } + + /** + * Validates the request. + * + * @param request The request to validate. + * + * @throws InvalidRequestException if the request is not valid. + * @throws UnsupportedAssignorException if the assignor is not supported. + */ + private void throwIfStreamsInitializeRequestIsInvalid( + StreamsInitializeRequestData request + ) throws InvalidRequestException, UnsupportedAssignorException { + throwIfEmptyString(request.groupId(), "GroupId can't be empty."); + + if (request.topology().isEmpty()) { + throw new InvalidRequestException("There must be at least one subtopology."); + } + + // TODO: Check invariants + } + + /** /** * Verifies that the partitions currently owned by the member (the ones set in the * request) matches the ones that the member should own. It matches if the consumer @@ -1963,6 +1991,54 @@ public class GroupMetadataManager { return new CoordinatorResult<>(records, response); } + + /** + * Handles the initialization of the topology information on the broker side, that will be reused by all members of the group. + * + * @param groupId The group id from the request. + * @param subtopologies The list of subtopologies + * @return A Result containing the StreamsInitialize response and a list of records to update the state machine. + */ + private CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize(String groupId, + List<StreamsInitializeRequestData.Subtopology> subtopologies) + throws ApiException { + final List<CoordinatorRecord> records = new ArrayList<>(); + + // TODO: Throw if group does not exist or is not a streams group. Needs model of + // similar to final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId, createIfNotExists, records); + // throwIfNull(group); + + // TODO: For the POC, only check if internal topics exist + Set<String> missingTopics = new HashSet<>(); + for (StreamsInitializeRequestData.Subtopology subtopology : subtopologies) { + for (StreamsInitializeRequestData.TopicInfo topic : subtopology.stateChangelogTopics()) { + if (metadataImage.topics().getTopic(topic.name()) == null) { + missingTopics.add(topic.name()); + } + } + for (StreamsInitializeRequestData.TopicInfo topic : subtopology.repartitionSourceTopics()) { + if (metadataImage.topics().getTopic(topic.name()) == null) { + missingTopics.add(topic.name()); + } + } + } + if (!missingTopics.isEmpty()) { + StreamsInitializeResponseData response = + new StreamsInitializeResponseData() + .setErrorCode(STREAMS_INVALID_TOPOLOGY.code()) + .setErrorMessage("Internal topics " + String.join(", ", missingTopics) + " do not exist."); + + return new CoordinatorResult<>(records, response); + } else { + records.add(newStreamsGroupTopologyRecord(groupId, subtopologies)); + + StreamsInitializeResponseData response = new StreamsInitializeResponseData(); + + return new CoordinatorResult<>(records, response); + } + + } + /** * Handle a JoinGroupRequest to a ConsumerGroup. * @@ -3563,6 +3639,46 @@ public class GroupMetadataManager { } } + /** + * Handles a ConsumerGroupHeartbeat request. + * + * @param context The request context. + * @param request The actual ConsumerGroupHeartbeat request. + * + * @return A Result containing the ConsumerGroupHeartbeat response and + * a list of records to update the state machine. + */ + public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize( + RequestContext context, + StreamsInitializeRequestData request + ) throws ApiException { + throwIfStreamsInitializeRequestIsInvalid(request); + + return streamsInitialize( + request.groupId(), + request.topology() + ); + } + + /** + * Replays StreamsGroupTopologyKey/Value to update the hard state of + * the streams group. + * + * @param key A StreamsGroupTopologyKey key. + * @param value A StreamsGroupTopologyValue record. + */ + public void replay( + StreamsGroupTopologyKey key, + StreamsGroupTopologyValue value + ) { + + // TODO: Insert the topology information to the in-memory representation. Needs the notion + // of a Streams group +// String groupId = key.groupId(); +// StreamsGroup streamsGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); +// streamsGroup.setTopology(value); + } + /** * Handles a ShareGroupHeartbeat request. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index 812ee093c2c..480c1576440 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; +import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -45,6 +46,8 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; @@ -89,6 +92,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newStreamsGroupTopologyRecord; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -214,6 +218,84 @@ public class GroupCoordinatorRecordHelpersTest { )); } + @Test + public void testNewStreamsGroupTopologyRecord() { + List<StreamsInitializeRequestData.Subtopology> topology = + Collections.singletonList(new StreamsInitializeRequestData.Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new StreamsInitializeRequestData.TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new StreamsInitializeRequestData.TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new StreamsInitializeRequestData.TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new StreamsInitializeRequestData.TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + + List<StreamsGroupTopologyValue.Subtopology> expectedTopology = + Collections.singletonList(new StreamsGroupTopologyValue.Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new StreamsGroupTopologyValue.TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new StreamsGroupTopologyValue.TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new StreamsGroupTopologyValue.TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + + CoordinatorRecord expectedRecord = new CoordinatorRecord( + new ApiMessageAndVersion( + new StreamsGroupTopologyKey() + .setGroupId("group-id"), + (short) 15), + new ApiMessageAndVersion( + new StreamsGroupTopologyValue() + .setTopology(expectedTopology), + (short) 0)); + + assertEquals(expectedRecord, newStreamsGroupTopologyRecord( + "group-id", + topology + )); + } + @Test public void testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() { Uuid fooTopicId = Uuid.randomUuid(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index bccfa57397c..96273e368e1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.internals.Topic; @@ -53,6 +54,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -260,6 +263,118 @@ public class GroupCoordinatorServiceTest { ); } + @Test + public void testStreamsInitializeWhenNotStarted() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsInitializeRequestData request = new StreamsInitializeRequestData() + .setGroupId("foo"); + + CompletableFuture<StreamsInitializeResponseData> future = service.streamsInitialize( + requestContext(ApiKeys.STREAMS_INITIALIZE), + request + ); + + assertEquals( + new StreamsInitializeResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), + future.get() + ); + } + + @Test + public void testStreamsInitialize() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsInitializeRequestData request = new StreamsInitializeRequestData() + .setGroupId("foo"); + + service.startup(() -> 1); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("streams-group-initialize"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsInitializeResponseData() + )); + + CompletableFuture<StreamsInitializeResponseData> future = service.streamsInitialize( + requestContext(ApiKeys.STREAMS_INITIALIZE), + request + ); + + assertEquals(new StreamsInitializeResponseData(), future.get(5, TimeUnit.SECONDS)); + } + + private static Stream<Arguments> testStreamsInitializeWithExceptionSource() { + return Stream.of( + Arguments.arguments(new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null), + Arguments.arguments(new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null), + Arguments.arguments(new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null), + Arguments.arguments(new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null), + Arguments.arguments(new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid"), + Arguments.arguments(new StreamsInvalidTopologyException("Invalid"), Errors.STREAMS_INVALID_TOPOLOGY.code(), "Invalid") + ); + } + + @ParameterizedTest + @MethodSource("testStreamsInitializeWithExceptionSource") + public void testStreamsInitializeWithException( + Throwable exception, + short expectedErrorCode, + String expectedErrorMessage + ) throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + + StreamsInitializeRequestData request = new StreamsInitializeRequestData() + .setGroupId("foo"); + + service.startup(() -> 1); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("streams-group-initialize"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(exception)); + + CompletableFuture<StreamsInitializeResponseData> future = service.streamsInitialize( + requestContext(ApiKeys.STREAMS_INITIALIZE), + request + ); + + assertEquals( + new StreamsInitializeResponseData() + .setErrorCode(expectedErrorCode) + .setErrorMessage(expectedErrorMessage), + future.get(5, TimeUnit.SECONDS) + ); + } + @Test public void testPartitionFor() { CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index cb68771c8a5..33756fe53ac 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -131,6 +133,38 @@ public class GroupCoordinatorShardTest { assertEquals(result, coordinator.consumerGroupHeartbeat(context, request)); } + @Test + public void testStreamsInitialize() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); + CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + Time.SYSTEM, + new MockCoordinatorTimer<>(Time.SYSTEM), + mock(GroupCoordinatorConfig.class), + coordinatorMetrics, + metricsShard + ); + + RequestContext context = requestContext(ApiKeys.STREAMS_INITIALIZE); + StreamsInitializeRequestData request = new StreamsInitializeRequestData(); + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = new CoordinatorResult<>( + Collections.emptyList(), + new StreamsInitializeResponseData() + ); + + when(groupMetadataManager.streamsInitialize( + context, + request + )).thenReturn(result); + + assertEquals(result, coordinator.streamsInitialize(context, request)); + } + @Test public void testCommitOffset() { GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index c5bb26d4ba7..c9f2d80ca2b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -54,6 +54,11 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeRequestData.Subtopology; +import org.apache.kafka.common.message.StreamsInitializeRequestData.TopicConfig; +import org.apache.kafka.common.message.StreamsInitializeRequestData.TopicInfo; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -15757,4 +15762,139 @@ public class GroupMetadataManagerTest { assertEquals(expectedSuccessCount, successCount); return memberIds; } + + @Test + public void testTopologyInitialization() { + String groupId = "fooup"; + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "repartition"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "changelog"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .build(); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.consumerGroup(groupId)); + + final List<Subtopology> topology = Collections.singletonList( + new Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = + context.streamsInitialize( + new StreamsInitializeRequestData() + .setGroupId(groupId) + .setTopology(topology) + ); + + assertEquals( + new StreamsInitializeResponseData(), + result.response() + ); + + List<CoordinatorRecord> expectedRecords = Arrays.asList( + CoordinatorRecordHelpers.newStreamsGroupTopologyRecord( + groupId, + topology + ) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testTopologyInitializationMissingInternalTopics() { + String groupId = "fooup"; + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "repartition"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build()) + .build(); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.consumerGroup(groupId)); + + final List<Subtopology> topology = Collections.singletonList( + new Subtopology() + .setSubtopology("subtopology-id") + .setSinkTopics(Collections.singletonList("foo")) + .setSourceTopics(Collections.singletonList("bar")) + .setRepartitionSourceTopics( + Collections.singletonList( + new TopicInfo() + .setName("repartition") + .setPartitions(4) + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name1") + .setValue("config-value1") + )) + ) + ) + .setStateChangelogTopics( + Collections.singletonList( + new TopicInfo() + .setName("changelog") + .setTopicConfigs(Collections.singletonList( + new TopicConfig() + .setKey("config-name2") + .setValue("config-value2") + )) + ) + ) + ); + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = + context.streamsInitialize( + new StreamsInitializeRequestData() + .setGroupId(groupId) + .setTopology(topology) + ); + + assertEquals( + new StreamsInitializeResponseData() + .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code()) + .setErrorMessage("Internal topics changelog do not exist."), + result.response() + ); + + assertTrue(result.records().isEmpty()); + + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 4230934a499..bdcc0c9e012 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -36,6 +36,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.network.ClientInformation; @@ -84,6 +86,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; @@ -673,6 +677,37 @@ public class GroupMetadataManagerTestContext { return result; } + + public CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> streamsInitialize( + StreamsInitializeRequestData request + ) { + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.STREAMS_INITIALIZE, + ApiKeys.STREAMS_INITIALIZE.latestVersion(), + "client", + 0 + ), + "1", + InetAddress.getLoopbackAddress(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + CoordinatorResult<StreamsInitializeResponseData, CoordinatorRecord> result = groupMetadataManager.streamsInitialize( + context, + request + ); + + if (result.replayRecords()) { + result.records().forEach(this::replay); + } + return result; + } + public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) { time.sleep(ms); List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = timer.poll(); @@ -1583,6 +1618,13 @@ public class GroupMetadataManagerTestContext { ); break; + case StreamsGroupTopologyKey.HIGHEST_SUPPORTED_VERSION: + groupMetadataManager.replay( + (StreamsGroupTopologyKey) key.message(), + (StreamsGroupTopologyValue) messageOrNull(value) + ); + break; + default: throw new IllegalStateException("Received an unknown record type " + key.version() + " in " + record);
