This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4686728d8839d452377e4aed4bf96afd61cdc18b Author: Lucas Brutschy <[email protected]> AuthorDate: Tue Oct 29 14:37:57 2024 +0100 Resolve conflicts from 11/25 trunk rebase - Internal topic auto creation (#17433) * impl * fixes --- .../group/GroupCoordinatorAdapter.scala | 5 +- .../kafka/server/AutoTopicCreationManager.scala | 39 +++- core/src/main/scala/kafka/server/KafkaApis.scala | 56 +++++- .../server/AutoTopicCreationManagerTest.scala | 199 ++++++++++++++++++++- .../scala/unit/kafka/server/KafkaApisTest.scala | 8 +- .../kafka/coordinator/group/GroupCoordinator.java | 6 +- .../coordinator/group/GroupCoordinatorService.java | 11 +- .../coordinator/group/GroupCoordinatorShard.java | 4 +- .../coordinator/group/GroupMetadataManager.java | 55 +++--- .../streams/StreamsGroupInitializeResult.java | 74 ++++++++ .../group/GroupCoordinatorServiceTest.java | 14 +- .../group/GroupCoordinatorShardTest.java | 5 +- .../group/GroupMetadataManagerTest.java | 51 ++++-- .../group/GroupMetadataManagerTestContext.java | 6 +- .../SmokeTestDriverIntegrationTest.java | 13 -- streams/src/test/resources/log4j.properties | 2 + 16 files changed, 447 insertions(+), 101 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index ce8bde14493..ed4322fe67a 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -18,7 +18,7 @@ package kafka.coordinator.group import kafka.server.{KafkaConfig, ReplicaManager} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] +import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...] import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch @@ -26,6 +26,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, Tr import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.coordinator.group import org.apache.kafka.coordinator.group.OffsetAndMetadata +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.util.FutureUtils @@ -80,7 +81,7 @@ private[group] class GroupCoordinatorAdapter( override def streamsGroupInitialize( context: RequestContext, request: StreamsGroupInitializeRequestData - ): CompletableFuture[StreamsGroupInitializeResponseData] = { + ): CompletableFuture[StreamsGroupInitializeResult] = { FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( s"The old group coordinator does not support ${ApiKeys.STREAMS_GROUP_INITIALIZE.name} API." )) diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index 58b3035935c..3b0ec564bec 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -47,6 +47,13 @@ trait AutoTopicCreationManager { controllerMutationQuota: ControllerMutationQuota, metadataRequestContext: Option[RequestContext] ): Seq[MetadataResponseTopic] + + + def createStreamsInternalTopics( + topics: Map[String, CreatableTopic], + requestContext: RequestContext + ): Unit + } object AutoTopicCreationManager { @@ -108,6 +115,31 @@ class DefaultAutoTopicCreationManager( uncreatableTopicResponses ++ creatableTopicResponses } + override def createStreamsInternalTopics( + topics: Map[String, CreatableTopic], + requestContext: RequestContext + ): Unit = { + + for ((_, creatableTopic) <- topics) { + if (creatableTopic.numPartitions() == -1) { + creatableTopic + .setNumPartitions(config.numPartitions) + } + if (creatableTopic.replicationFactor() == -1) { + creatableTopic + .setReplicationFactor(config.defaultReplicationFactor.shortValue) + } + } + + if (topics.isEmpty) { + Seq.empty + } else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) { + sendCreateTopicRequest(topics, Some(requestContext)) + } else { + throw new IllegalStateException("Controller must be defined in order to create streams internal topics.") + } + } + private def createTopicsInZk( creatableTopics: Map[String, CreatableTopic], controllerMutationQuota: ControllerMutationQuota @@ -160,7 +192,7 @@ class DefaultAutoTopicCreationManager( private def sendCreateTopicRequest( creatableTopics: Map[String, CreatableTopic], - metadataRequestContext: Option[RequestContext] + requestContext: Option[RequestContext] ): Seq[MetadataResponseTopic] = { val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size) topicsToCreate.addAll(creatableTopics.values.asJavaCollection) @@ -184,7 +216,8 @@ class DefaultAutoTopicCreationManager( } else if (response.versionMismatch() != null) { warn(s"Auto topic creation failed for ${creatableTopics.keys} with invalid version exception") } else { - debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.") + // TODO: Response may still contain errors for individual topics. This should be exposed. + info(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.") } } } @@ -193,7 +226,7 @@ class DefaultAutoTopicCreationManager( throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.") } - val request = metadataRequestContext.map { context => + val request = requestContext.map { context => val requestVersion = channelManager.controllerApiVersions.toScala match { case None => diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5e2abe450c2..31c5aaeeccf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3891,10 +3891,9 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleStreamsGroupInitialize(request: RequestChannel.Request): CompletableFuture[Unit] = { + // TODO: The unit tests for this method are insufficient. Once we merge initialize with group heartbeat, we have to extend the tests to cover ACLs and internal topic creation val streamsGroupInitializeRequest = request.body[StreamsGroupInitializeRequest] - // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS - if (!isStreamsGroupProtocolEnabled()) { // The API is not supported by the "old" group coordinator (the default). If the // new one is not enabled, we fail directly here. @@ -3904,6 +3903,53 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val requestContext = request.context + + val internalTopics: Map[String, StreamsGroupInitializeRequestData.TopicInfo] = { + streamsGroupInitializeRequest.data().topology().asScala.flatMap(subtopology => + subtopology.repartitionSourceTopics().iterator().asScala ++ subtopology.stateChangelogTopics().iterator().asScala + ).map(x => x.name() -> x).toMap + } + + val prohibitedInternalTopics = internalTopics.keys.filter(Topic.isInternal) + if (prohibitedInternalTopics.nonEmpty) { + val errorResponse = new StreamsGroupInitializeResponseData() + errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code) + errorResponse.setErrorMessage(f"Use of Kafka internal topics ${prohibitedInternalTopics.mkString(",")} as Kafka Streams internal topics is prohibited.") + requestHelper.sendMaybeThrottle(request, new StreamsGroupInitializeResponse(errorResponse)) + return CompletableFuture.completedFuture[Unit](()) + } + + val invalidTopics = internalTopics.keys.filterNot(Topic.isValid) + if (invalidTopics.nonEmpty) { + val errorResponse = new StreamsGroupInitializeResponseData() + errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code) + errorResponse.setErrorMessage(f"Internal topic names ${invalidTopics.mkString(",")} are not valid topic names.") + requestHelper.sendMaybeThrottle(request, new StreamsGroupInitializeResponse(errorResponse)) + return CompletableFuture.completedFuture[Unit](()) + } + + // TODO: Once we move initialization to the heartbeat, we should only require these permissions if there are missing internal topics. + if(!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)) { + val (_, createTopicUnauthorized) = authHelper.partitionSeqByAuthorized(request.context, CREATE, TOPIC, internalTopics.keys.toSeq)(identity[String]) + if (createTopicUnauthorized.nonEmpty) { + val errorResponse = new StreamsGroupInitializeResponseData() + errorResponse.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + errorResponse.setErrorMessage(f"Unauthorized to CREATE TOPIC ${createTopicUnauthorized.mkString(",")}.") + requestHelper.sendMaybeThrottle(request, new StreamsGroupInitializeResponse(errorResponse)) + return CompletableFuture.completedFuture[Unit](()) + } + } + + val (_, describeConfigsAuthorized) = authHelper.partitionSeqByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, internalTopics.keys.toSeq)(identity[String]) + if (describeConfigsAuthorized.nonEmpty) { + val errorResponse = new StreamsGroupInitializeResponseData() + errorResponse.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + errorResponse.setErrorMessage(f"Unauthorized to DESCRIBE_CONFIGS on topics ${describeConfigsAuthorized.mkString(",")} are unauthorized.") + requestHelper.sendMaybeThrottle(request, new StreamsGroupInitializeResponse(errorResponse)) + return CompletableFuture.completedFuture[Unit](()) + } + groupCoordinator.streamsGroupInitialize( request.context, streamsGroupInitializeRequest.data, @@ -3911,7 +3957,11 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, streamsGroupInitializeRequest.getErrorResponse(exception)) } else { - requestHelper.sendMaybeThrottle(request, new StreamsGroupInitializeResponse(response)) + if (!response.creatableTopics().isEmpty) { + // TODO: Once we move this code to the heartbeat, we should indicate which topics are being created. We should also find a way to propagate failures to the client + autoTopicCreationManager.createStreamsInternalTopics(response.creatableTopics().asScala, requestContext); + } + requestHelper.sendMaybeThrottle(request, new StreamsGroupInitializeResponse(response.responseData())) } } } diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala index d86c450ea0a..12b63489613 100644 --- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.common.Node import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} import org.apache.kafka.common.message.{ApiVersionsResponseData, CreateTopicsRequestData} -import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic +import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -44,6 +44,7 @@ import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeT import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.never import org.mockito.invocation.InvocationOnMock import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} @@ -255,7 +256,7 @@ class AutoTopicCreationManagerTest { override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) } - val requestContext = initializeRequestContext(topicName, userPrincipal, Optional.of(principalSerde)) + val requestContext = initializeRequestContext(userPrincipal, Optional.of(principalSerde)) autoTopicCreationManager.createTopics( Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)) @@ -274,7 +275,7 @@ class AutoTopicCreationManagerTest { def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit = { val topicName = "topic" - val requestContext = initializeRequestContext(topicName, KafkaPrincipal.ANONYMOUS, Optional.empty()) + val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, Optional.empty()) // Throw upon undefined principal serde when building the forward request assertThrows(classOf[IllegalArgumentException], () => autoTopicCreationManager.createTopics( @@ -292,7 +293,7 @@ class AutoTopicCreationManagerTest { override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) } - val requestContext = initializeRequestContext(topicName, KafkaPrincipal.ANONYMOUS, Optional.of(principalSerde)) + val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, Optional.of(principalSerde)) autoTopicCreationManager.createTopics( Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)) autoTopicCreationManager.createTopics( @@ -322,9 +323,16 @@ class AutoTopicCreationManagerTest { argumentCaptor.capture()) } - private def initializeRequestContext(topicName: String, - kafkaPrincipal: KafkaPrincipal, - principalSerde: Optional[KafkaPrincipalSerde]): RequestContext = { + @Test + def testCreateStreamsInternalTopics(): Unit = { + val topicConfig = new CreatableTopicConfigCollection() + topicConfig.add(new CreatableTopicConfig().setName("cleanup.policy").setValue("compact")); + + val topics = Map( + "stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2).setConfigs(topicConfig), + "stream-topic-2" -> new CreatableTopic().setName("stream-topic-2").setNumPartitions(1).setReplicationFactor(1) + ) + val requestContext = initializeRequestContextWithUserPrinciple() autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, @@ -335,8 +343,183 @@ class AutoTopicCreationManagerTest { transactionCoordinator, Some(shareCoordinator)) + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]) + Mockito.verify(brokerToController).sendRequest( + argumentCaptor.capture(), + any(classOf[ControllerRequestCompletionHandler])) + + val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "clientId", 0) + val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion()) val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection - topicsCollection.add(getNewTopic(topicName)) + topicsCollection.add(getNewTopic("stream-topic-1", 3, 2.toShort).setConfigs(topicConfig)) + topicsCollection.add(getNewTopic("stream-topic-2", 1, 1.toShort)) + val requestBody = new CreateTopicsRequest.Builder( + new CreateTopicsRequestData() + .setTopics(topicsCollection) + .setTimeoutMs(requestTimeout)) + .build(0) + + val forwardedRequestBuffer = capturedRequest.requestData().duplicate() + assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer)); + assertEquals(requestBody.data(), CreateTopicsRequest.parse(forwardedRequestBuffer, 0).data()) + } + + @Test + def testCreateStreamsInternalTopicsWhenControllerNotActive(): Unit = { + val topics = Map( + "stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2) + ) + val requestContext = initializeRequestContextWithUserPrinciple() + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + Some(brokerToController), + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator, + Some(shareCoordinator)) + + Mockito.when(controller.isActive).thenReturn(false) + + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + any(classOf[ControllerRequestCompletionHandler])) + } + + @Test + def testFailStreamsInternalTopicsWhenNoChannelManager(): Unit = { + val topics = Map( + "stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2) + ) + val requestContext = initializeRequestContextWithUserPrinciple() + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + None, + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator, + Some(shareCoordinator)) + + Mockito.when(controller.isActive).thenReturn(true) + + assertThrows(classOf[IllegalStateException], () => autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext)) + } + + @Test + def testCreateStreamsInternalTopicsWithEmptyTopics(): Unit = { + val topics = Map.empty[String, CreatableTopic] + val requestContext = initializeRequestContextWithUserPrinciple() + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + Some(brokerToController), + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator, + Some(shareCoordinator)) + + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) + + Mockito.verify(brokerToController, never()).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + any(classOf[ControllerRequestCompletionHandler])) + } + + @Test + def testCreateStreamsInternalTopicsWithDefaultConfig(): Unit = { + val topics = Map( + "stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1) + ) + val requestContext = initializeRequestContextWithUserPrinciple() + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + Some(brokerToController), + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator, + Some(shareCoordinator)) + + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext); + + val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]) + Mockito.verify(brokerToController).sendRequest( + argumentCaptor.capture(), + any(classOf[ControllerRequestCompletionHandler])) + + val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion()) + + val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "clientId", 0) + val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection + topicsCollection.add(getNewTopic("stream-topic-1", config.numPartitions, config.defaultReplicationFactor.toShort)) + val requestBody = new CreateTopicsRequest.Builder( + new CreateTopicsRequestData() + .setTopics(topicsCollection) + .setTimeoutMs(requestTimeout)) + .build(0) + val forwardedRequestBuffer = capturedRequest.requestData().duplicate() + assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer)); + assertEquals(requestBody.data(), CreateTopicsRequest.parse(forwardedRequestBuffer, 0).data()) + } + + @Test + def testCreateStreamsInternalTopicsPassesPrinciple(): Unit = { + val topics = Map( + "stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1) + ) + val requestContext = initializeRequestContextWithUserPrinciple() + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + Some(brokerToController), + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator, + Some(shareCoordinator)) + + autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext); + + val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]) + Mockito.verify(brokerToController).sendRequest( + argumentCaptor.capture(), + any(classOf[ControllerRequestCompletionHandler])) + val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion()) + assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"), SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal))) + } + + private def initializeRequestContextWithUserPrinciple(): RequestContext = { + val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") + val principalSerde = new KafkaPrincipalSerde { + override def serialize(principal: KafkaPrincipal): Array[Byte] = { + Utils.utf8(principal.toString) + } + override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) + } + initializeRequestContext(userPrincipal, Optional.of(principalSerde)) + } + + private def initializeRequestContext(kafkaPrincipal: KafkaPrincipal, + principalSerde: Optional[KafkaPrincipalSerde]): RequestContext = { + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + Some(brokerToController), + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator, + Some(shareCoordinator)) + val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion() .setApiKey(ApiKeys.CREATE_TOPICS.id) .setMinVersion(0) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 8d62dadd642..d31e6fb27e7 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -77,6 +77,8 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} +import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.LeaderAndIsr @@ -11242,7 +11244,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, true).build()) - val future = new CompletableFuture[StreamsGroupInitializeResponseData]() + val future = new CompletableFuture[StreamsGroupInitializeResult]() when(groupCoordinator.streamsGroupInitialize( requestChannelRequest.context, streamsGroupInitializeRequest @@ -11253,7 +11255,7 @@ class KafkaApisTest extends Logging { ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) - val streamsGroupInitializeResponse = new StreamsGroupInitializeResponseData() + val streamsGroupInitializeResponse = new StreamsGroupInitializeResult(new StreamsGroupInitializeResponseData()) future.complete(streamsGroupInitializeResponse) val response = verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest) @@ -11268,7 +11270,7 @@ class KafkaApisTest extends Logging { val requestChannelRequest = buildRequest(new StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, true).build()) - val future = new CompletableFuture[StreamsGroupInitializeResponseData]() + val future = new CompletableFuture[StreamsGroupInitializeResult]() when(groupCoordinator.streamsGroupInitialize( requestChannelRequest.context, streamsGroupInitializeRequest diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 7c78db21761..07d5d7daaee 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -43,7 +43,6 @@ import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; -import org.apache.kafka.common.message.StreamsGroupInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -51,6 +50,7 @@ import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -93,10 +93,10 @@ public interface GroupCoordinator { * @param context The request context. * @param request The StreamsGroupInitializeRequest data. * - * @return A future yielding the response. + * @return A future yielding the result, which contains the response and all topics to be created. * The error code(s) of the response are set to indicate the error(s) occurred during the execution. */ - CompletableFuture<StreamsGroupInitializeResponseData> streamsGroupInitialize( + CompletableFuture<StreamsGroupInitializeResult> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 007e991ad16..3f2f4920162 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -78,6 +78,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSuppli import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor; import org.apache.kafka.coordinator.common.runtime.PartitionWriter; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.record.BrokerCompressionType; @@ -354,14 +355,14 @@ public class GroupCoordinatorService implements GroupCoordinator { * See {@link GroupCoordinator#streamsGroupInitialize(RequestContext, org.apache.kafka.common.message.StreamsGroupInitializeRequestData)}. */ @Override - public CompletableFuture<StreamsGroupInitializeResponseData> streamsGroupInitialize( + public CompletableFuture<StreamsGroupInitializeResult> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ) { if (!isActive.get()) { - return CompletableFuture.completedFuture(new StreamsGroupInitializeResponseData() + return CompletableFuture.completedFuture(new StreamsGroupInitializeResult(new StreamsGroupInitializeResponseData() .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - ); + )); } return runtime.scheduleWriteOperation( @@ -373,9 +374,9 @@ public class GroupCoordinatorService implements GroupCoordinator { "streams-group-initialize", request, exception, - (error, message) -> new StreamsGroupInitializeResponseData() + (error, message) -> new StreamsGroupInitializeResult(new StreamsGroupInitializeResponseData() .setErrorCode(error.code()) - .setErrorMessage(message), + .setErrorMessage(message)), log )); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 6a2ad52ec3a..1e67ca0f882 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -43,7 +43,6 @@ import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; -import org.apache.kafka.common.message.StreamsGroupInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -107,6 +106,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey; import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.coordinator.group.taskassignor.StickyTaskAssignor; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -398,7 +398,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord * @return A Result containing the StreamsGroupInitialize response and * a list of records to update the state machine. */ - public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize( + public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f613d11c8f3..78e433203a4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerProtocolSubscription; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -140,8 +141,11 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder; import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.coordinator.group.streams.StreamsGroup; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; import org.apache.kafka.coordinator.group.streams.StreamsTopology; +import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; @@ -177,7 +181,6 @@ import java.util.stream.Stream; import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION; import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; -import static org.apache.kafka.common.protocol.Errors.STREAMS_INVALID_TOPOLOGY; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; @@ -2664,7 +2667,7 @@ public class GroupMetadataManager { * @param subtopologies The list of subtopologies. * @return A Result containing the StreamsGroupInitialize response and a list of records to update the state machine. */ - private CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize(String groupId, + private CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> streamsGroupInitialize(String groupId, String topologyId, List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) throws ApiException { @@ -2679,49 +2682,31 @@ public class GroupMetadataManager { if (!isTopologyInitializationScheduled(groupId, topologyId)) { log.warn("No topology to initialize for group ID {} and topology ID {} found.", groupId, topologyId); StreamsGroupInitializeResponseData response = new StreamsGroupInitializeResponseData(); - return new CoordinatorResult<>(records, response); - } - - // TODO: For the POC, only check if internal topics exist - Set<String> missingTopics = new HashSet<>(); - for (StreamsGroupInitializeRequestData.Subtopology subtopology : subtopologies) { - for (StreamsGroupInitializeRequestData.TopicInfo topic : subtopology.stateChangelogTopics()) { - if (metadataImage.topics().getTopic(topic.name()) == null) { - missingTopics.add(topic.name()); - } - } - for (StreamsGroupInitializeRequestData.TopicInfo topic : subtopology.repartitionSourceTopics()) { - if (metadataImage.topics().getTopic(topic.name()) == null) { - missingTopics.add(topic.name()); - } - } + return new CoordinatorResult<>(records, new StreamsGroupInitializeResult(response)); } StreamsGroupTopologyValue recordValue = convertToStreamsGroupTopologyRecord(subtopologies); - cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId); - - if (!missingTopics.isEmpty()) { - StreamsGroupInitializeResponseData response = - new StreamsGroupInitializeResponseData() - .setErrorCode(STREAMS_INVALID_TOPOLOGY.code()) - .setErrorMessage("Internal topics " + String.join(", ", missingTopics) + " do not exist."); + final Map<String, ConfiguredSubtopology> configuredTopics = + InternalTopicManager.configureTopics(logContext, recordValue.topology(), metadataImage); - return new CoordinatorResult<>(records, response); - } else { - records.add(newStreamsGroupTopologyRecord(groupId, recordValue)); + cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId); - final Map<String, StreamsGroupTopologyValue.Subtopology> subtopologyMap = recordValue.topology().stream() - .collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x)); + records.add(newStreamsGroupTopologyRecord(groupId, recordValue)); - final StreamsTopology topology = new StreamsTopology(topologyId, subtopologyMap); + final Map<String, StreamsGroupTopologyValue.Subtopology> subtopologyMap = recordValue.topology().stream() + .collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, x -> x)); + final StreamsTopology topology = new StreamsTopology(topologyId, subtopologyMap); + final Map<String, CreatableTopic> missingTopics = InternalTopicManager.missingTopics(configuredTopics, metadataImage); + // TODO: This needs to be sorted out, once we merge heartbeat and initialization + if (missingTopics.isEmpty()) { computeFirstTargetAssignmentAfterTopologyInitialization(group, records, topology); + } - StreamsGroupInitializeResponseData response = new StreamsGroupInitializeResponseData(); + StreamsGroupInitializeResponseData response = new StreamsGroupInitializeResponseData(); - return new CoordinatorResult<>(records, response); - } + return new CoordinatorResult<>(records, new StreamsGroupInitializeResult(response, missingTopics)); } @@ -4913,7 +4898,7 @@ public class GroupMetadataManager { * @return A Result containing the StreamsGroupInitialize response and * a list of records to update the state machine. */ - public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize( + public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> streamsGroupInitialize( RequestContext context, StreamsGroupInitializeRequestData request ) throws ApiException { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java new file mode 100644 index 00000000000..7a4845993eb --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.StreamsGroupInitializeResponseData; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +public class StreamsGroupInitializeResult { + + private final StreamsGroupInitializeResponseData data; + private final Map<String, CreatableTopic> creatableTopics; + + public StreamsGroupInitializeResult(StreamsGroupInitializeResponseData data, Map<String, CreatableTopic> creatableTopics) { + this.data = data; + this.creatableTopics = creatableTopics; + } + + public StreamsGroupInitializeResult(StreamsGroupInitializeResponseData data) { + this.data = data; + this.creatableTopics = Collections.emptyMap(); + } + + public StreamsGroupInitializeResponseData responseData() { + return data; + } + + public Map<String, CreatableTopic> creatableTopics() { + return creatableTopics; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final StreamsGroupInitializeResult that = (StreamsGroupInitializeResult) o; + return Objects.equals(data, that.data) && Objects.equals(creatableTopics, + that.creatableTopics); + } + + @Override + public int hashCode() { + return Objects.hash(data, creatableTopics); + } + + @Override + public String toString() { + return "StreamsGroupInitializeResult{" + + "data=" + data + + ", creatableTopics=" + creatableTopics + + '}'; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 7505aa8c791..6ce91186adf 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -78,6 +78,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.util.FutureUtils; @@ -280,14 +281,15 @@ public class GroupCoordinatorServiceTest { StreamsGroupInitializeRequestData request = new StreamsGroupInitializeRequestData() .setGroupId("foo"); - CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsGroupInitialize( + CompletableFuture<StreamsGroupInitializeResult> future = service.streamsGroupInitialize( requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE), request ); assertEquals( - new StreamsGroupInitializeResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), + new StreamsGroupInitializeResult( + new StreamsGroupInitializeResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())), future.get() ); } @@ -317,7 +319,7 @@ public class GroupCoordinatorServiceTest { new StreamsGroupInitializeResponseData() )); - CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsGroupInitialize( + CompletableFuture<StreamsGroupInitializeResult> future = service.streamsGroupInitialize( requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE), request ); @@ -368,7 +370,7 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture(exception)); - CompletableFuture<StreamsGroupInitializeResponseData> future = service.streamsGroupInitialize( + CompletableFuture<StreamsGroupInitializeResult> future = service.streamsGroupInitialize( requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE), request ); @@ -377,7 +379,7 @@ public class GroupCoordinatorServiceTest { new StreamsGroupInitializeResponseData() .setErrorCode(expectedErrorCode) .setErrorMessage(expectedErrorMessage), - future.get(5, TimeUnit.SECONDS) + future.get(5, TimeUnit.SECONDS).responseData() ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 03966c07ea8..b836131e667 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -65,6 +65,7 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -154,9 +155,9 @@ public class GroupCoordinatorShardTest { RequestContext context = requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE); StreamsGroupInitializeRequestData request = new StreamsGroupInitializeRequestData(); - CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = new CoordinatorResult<>( + CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> result = new CoordinatorResult<>( Collections.emptyList(), - new StreamsGroupInitializeResponseData() + new StreamsGroupInitializeResult(new StreamsGroupInitializeResponseData()) ); when(groupMetadataManager.streamsGroupInitialize( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 8ff26e74f35..afdbe390021 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -41,6 +41,9 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.message.ConsumerProtocolAssignment; import org.apache.kafka.common.message.ConsumerProtocolSubscription; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -97,6 +100,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember; import org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers; import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil; import org.apache.kafka.image.MetadataDelta; @@ -567,10 +571,10 @@ public class GroupMetadataManagerTest { .setTopologyId(topologyId) .setTopology(subtopologies); - CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); + CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); assertNotNull(result.response()); - StreamsGroupInitializeResponseData response = result.response(); + StreamsGroupInitializeResponseData response = result.response().responseData(); assertEquals(Errors.NONE.code(), response.errorCode()); List<CoordinatorRecord> coordinatorRecords = result.records(); assertEquals(5, coordinatorRecords.size()); @@ -685,10 +689,10 @@ public class GroupMetadataManagerTest { .setTopologyId(wrongTopologyId) .setTopology(subtopologies); - CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); + CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); assertNotNull(result.response()); - StreamsGroupInitializeResponseData response = result.response(); + StreamsGroupInitializeResponseData response = result.response().responseData(); assertEquals(Errors.NONE.code(), response.errorCode()); assertTrue(result.records().isEmpty()); } @@ -729,10 +733,10 @@ public class GroupMetadataManagerTest { .setTopology(subtopologies); context.streamsGroupInitialize(initialize); - CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); + CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> result = context.streamsGroupInitialize(initialize); assertNotNull(result.response()); - StreamsGroupInitializeResponseData response = result.response(); + StreamsGroupInitializeResponseData response = result.response().responseData(); assertEquals(Errors.NONE.code(), response.errorCode()); assertTrue(result.records().isEmpty()); } @@ -745,9 +749,12 @@ public class GroupMetadataManagerTest { String processId = "process-id"; Uuid fooTopicId = Uuid.randomUuid(); String fooTopicName = "repartition"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(fooTopicId, fooTopicName, 4) + .addTopic(barTopicId, barTopicName, 4) .addRacks() .build()) .build(); @@ -776,6 +783,7 @@ public class GroupMetadataManagerTest { Collections.singletonList( new StreamsGroupInitializeRequestData.TopicInfo() .setName("changelog") + .setReplicationFactor((short) 3) .setTopicConfigs(Collections.singletonList( new StreamsGroupInitializeRequestData.TopicConfig() .setKey("config-name2") @@ -784,7 +792,7 @@ public class GroupMetadataManagerTest { ) ) ); - CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = + CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> result = context.streamsGroupInitialize( new StreamsGroupInitializeRequestData() .setGroupId(groupId) @@ -792,14 +800,31 @@ public class GroupMetadataManagerTest { .setTopology(topology) ); + CreatableTopicConfigCollection expectedConfig = new CreatableTopicConfigCollection(); + expectedConfig.add( + new CreatableTopicConfig() + .setName("config-name2") + .setValue("config-value2") + ); + CreatableTopic expected = + new CreatableTopic() + .setName("changelog") + .setNumPartitions(4) + .setReplicationFactor((short) 3) + .setConfigs(expectedConfig); + assertEquals( - new StreamsGroupInitializeResponseData() - .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code()) - .setErrorMessage("Internal topics changelog do not exist."), + new StreamsGroupInitializeResult( + new StreamsGroupInitializeResponseData(), + Collections.singletonMap( + "changelog", expected + ) + ), result.response() ); - - assertTrue(result.records().isEmpty()); + + // TODO: Need to check the generated records. Adapt this unit test after merging of + // initilization & heartbeat } private StreamsGroupHeartbeatRequestData buildFirstStreamsGroupHeartbeatRequest( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 96fd18344c0..08990d9c077 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -40,7 +40,6 @@ import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; import org.apache.kafka.common.message.StreamsGroupInitializeRequestData; -import org.apache.kafka.common.message.StreamsGroupInitializeResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.network.ClientInformation; @@ -111,6 +110,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder; import org.apache.kafka.coordinator.group.streams.StreamsGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; +import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult; import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -736,7 +736,7 @@ public class GroupMetadataManagerTestContext { } - public CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> streamsGroupInitialize( + public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> streamsGroupInitialize( StreamsGroupInitializeRequestData request ) { RequestContext context = new RequestContext( @@ -755,7 +755,7 @@ public class GroupMetadataManagerTestContext { false ); - CoordinatorResult<StreamsGroupInitializeResponseData, CoordinatorRecord> result = groupMetadataManager.streamsGroupInitialize( + CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> result = groupMetadataManager.streamsGroupInitialize( context, request ); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index ab8fb57d3fb..498613ce783 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -127,19 +127,6 @@ public class SmokeTestDriverIntegrationTest extends IntegrationTestHarness { } for (final String topic: new String[]{ - "SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog", - "SmokeTest-minStoreName-changelog", - "SmokeTest-cntByCnt-repartition", - "SmokeTest-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog", - "SmokeTest-sum-STATE-STORE-0000000050-changelog", - "SmokeTest-uwin-cnt-changelog", - "SmokeTest-maxStoreName-changelog", - "SmokeTest-cntStoreName-changelog", - "SmokeTest-KTABLE-SUPPRESS-STATE-STORE-0000000027-changelog", - "SmokeTest-win-sum-changelog", - "SmokeTest-uwin-max-changelog", - "SmokeTest-uwin-min-changelog", - "SmokeTest-cntByCnt-changelog", "data", "echo", "max", diff --git a/streams/src/test/resources/log4j.properties b/streams/src/test/resources/log4j.properties index 197c8c74210..2f0f3833eb9 100644 --- a/streams/src/test/resources/log4j.properties +++ b/streams/src/test/resources/log4j.properties @@ -28,6 +28,8 @@ log4j.logger.org.apache.kafka.clients.consumer=INFO log4j.logger.org.apache.kafka.clients.producer=INFO log4j.logger.org.apache.kafka.streams=INFO log4j.logger.org.apache.kafka.coordinator.group=INFO +log4j.logger.org.apache.kafka.clients.consumer.internals.StreamsGroupInitializeRequestManager=INFO +log4j.logger.org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager=INFO # printing out the configs takes up a huge amount of the allotted characters, # and provides little value as we can always figure out the test configs without the logs
