This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4686728d8839d452377e4aed4bf96afd61cdc18b
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Oct 29 14:37:57 2024 +0100

    Resolve conflicts from 11/25 trunk rebase - Internal topic auto creation 
(#17433)
    
    * impl
    
    * fixes
---
 .../group/GroupCoordinatorAdapter.scala            |   5 +-
 .../kafka/server/AutoTopicCreationManager.scala    |  39 +++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  56 +++++-
 .../server/AutoTopicCreationManagerTest.scala      | 199 ++++++++++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   8 +-
 .../kafka/coordinator/group/GroupCoordinator.java  |   6 +-
 .../coordinator/group/GroupCoordinatorService.java |  11 +-
 .../coordinator/group/GroupCoordinatorShard.java   |   4 +-
 .../coordinator/group/GroupMetadataManager.java    |  55 +++---
 .../streams/StreamsGroupInitializeResult.java      |  74 ++++++++
 .../group/GroupCoordinatorServiceTest.java         |  14 +-
 .../group/GroupCoordinatorShardTest.java           |   5 +-
 .../group/GroupMetadataManagerTest.java            |  51 ++++--
 .../group/GroupMetadataManagerTestContext.java     |   6 +-
 .../SmokeTestDriverIntegrationTest.java            |  13 --
 streams/src/test/resources/log4j.properties        |   2 +
 16 files changed, 447 insertions(+), 101 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index ce8bde14493..ed4322fe67a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,7 +18,7 @@ package kafka.coordinator.group
 
 import kafka.server.{KafkaConfig, ReplicaManager}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
@@ -26,6 +26,7 @@ import org.apache.kafka.common.requests.{OffsetCommitRequest, 
RequestContext, Tr
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.coordinator.group
 import org.apache.kafka.coordinator.group.OffsetAndMetadata
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.util.FutureUtils
@@ -80,7 +81,7 @@ private[group] class GroupCoordinatorAdapter(
   override def streamsGroupInitialize(
                                    context: RequestContext,
                                    request: StreamsGroupInitializeRequestData
-                                 ): 
CompletableFuture[StreamsGroupInitializeResponseData] = {
+                                 ): 
CompletableFuture[StreamsGroupInitializeResult] = {
     FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
       s"The old group coordinator does not support 
${ApiKeys.STREAMS_GROUP_INITIALIZE.name} API."
     ))
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 58b3035935c..3b0ec564bec 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -47,6 +47,13 @@ trait AutoTopicCreationManager {
     controllerMutationQuota: ControllerMutationQuota,
     metadataRequestContext: Option[RequestContext]
   ): Seq[MetadataResponseTopic]
+
+
+  def createStreamsInternalTopics(
+    topics: Map[String, CreatableTopic],
+    requestContext: RequestContext
+  ): Unit
+
 }
 
 object AutoTopicCreationManager {
@@ -108,6 +115,31 @@ class DefaultAutoTopicCreationManager(
     uncreatableTopicResponses ++ creatableTopicResponses
   }
 
+  override def createStreamsInternalTopics(
+    topics: Map[String, CreatableTopic],
+    requestContext: RequestContext
+  ): Unit = {
+
+    for ((_, creatableTopic) <- topics) {
+      if (creatableTopic.numPartitions() == -1) {
+        creatableTopic
+          .setNumPartitions(config.numPartitions)
+      }
+      if (creatableTopic.replicationFactor() == -1) {
+        creatableTopic
+          .setReplicationFactor(config.defaultReplicationFactor.shortValue)
+      }
+    }
+
+    if (topics.isEmpty) {
+      Seq.empty
+    } else if (controller.isEmpty || !controller.get.isActive && 
channelManager.isDefined) {
+      sendCreateTopicRequest(topics, Some(requestContext))
+    } else {
+      throw new IllegalStateException("Controller must be defined in order to 
create streams internal topics.")
+    }
+  }
+
   private def createTopicsInZk(
     creatableTopics: Map[String, CreatableTopic],
     controllerMutationQuota: ControllerMutationQuota
@@ -160,7 +192,7 @@ class DefaultAutoTopicCreationManager(
 
   private def sendCreateTopicRequest(
     creatableTopics: Map[String, CreatableTopic],
-    metadataRequestContext: Option[RequestContext]
+    requestContext: Option[RequestContext]
   ): Seq[MetadataResponseTopic] = {
     val topicsToCreate = new 
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
     topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
@@ -184,7 +216,8 @@ class DefaultAutoTopicCreationManager(
         } else if (response.versionMismatch() != null) {
           warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
invalid version exception")
         } else {
-          debug(s"Auto topic creation completed for ${creatableTopics.keys} 
with response ${response.responseBody}.")
+          // TODO: Response may still contain errors for individual topics. 
This should be exposed.
+          info(s"Auto topic creation completed for ${creatableTopics.keys} 
with response ${response.responseBody}.")
         }
       }
     }
@@ -193,7 +226,7 @@ class DefaultAutoTopicCreationManager(
       throw new IllegalStateException("Channel manager must be defined in 
order to send CreateTopic requests.")
     }
 
-    val request = metadataRequestContext.map { context =>
+    val request = requestContext.map { context =>
       val requestVersion =
         channelManager.controllerApiVersions.toScala match {
           case None =>
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5e2abe450c2..31c5aaeeccf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3891,10 +3891,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleStreamsGroupInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    // TODO: The unit tests for this method are insufficient. Once we merge 
initialize with group heartbeat, we have to extend the tests to cover ACLs and 
internal topic creation
     val streamsGroupInitializeRequest = 
request.body[StreamsGroupInitializeRequest]
 
-    // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS
-
     if (!isStreamsGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
@@ -3904,6 +3903,53 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val requestContext = request.context
+
+      val internalTopics: Map[String, 
StreamsGroupInitializeRequestData.TopicInfo] = {
+        
streamsGroupInitializeRequest.data().topology().asScala.flatMap(subtopology =>
+          subtopology.repartitionSourceTopics().iterator().asScala ++ 
subtopology.stateChangelogTopics().iterator().asScala
+        ).map(x => x.name() -> x).toMap
+      }
+
+      val prohibitedInternalTopics = 
internalTopics.keys.filter(Topic.isInternal)
+      if (prohibitedInternalTopics.nonEmpty) {
+        val errorResponse = new StreamsGroupInitializeResponseData()
+        errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
+        errorResponse.setErrorMessage(f"Use of Kafka internal topics 
${prohibitedInternalTopics.mkString(",")} as Kafka Streams internal topics is 
prohibited.")
+        requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(errorResponse))
+        return CompletableFuture.completedFuture[Unit](())
+      }
+
+      val invalidTopics = internalTopics.keys.filterNot(Topic.isValid)
+      if (invalidTopics.nonEmpty) {
+        val errorResponse = new StreamsGroupInitializeResponseData()
+        errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
+        errorResponse.setErrorMessage(f"Internal topic names 
${invalidTopics.mkString(",")} are not valid topic names.")
+        requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(errorResponse))
+        return CompletableFuture.completedFuture[Unit](())
+      }
+
+      // TODO: Once we move initialization to the heartbeat, we should only 
require these permissions if there are missing internal topics.
+      if(!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, 
logIfDenied = false)) {
+        val (_, createTopicUnauthorized) = 
authHelper.partitionSeqByAuthorized(request.context, CREATE, TOPIC, 
internalTopics.keys.toSeq)(identity[String])
+        if (createTopicUnauthorized.nonEmpty) {
+          val errorResponse = new StreamsGroupInitializeResponseData()
+          errorResponse.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+          errorResponse.setErrorMessage(f"Unauthorized to CREATE TOPIC 
${createTopicUnauthorized.mkString(",")}.")
+          requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(errorResponse))
+          return CompletableFuture.completedFuture[Unit](())
+        }
+      }
+
+      val (_, describeConfigsAuthorized) = 
authHelper.partitionSeqByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, 
internalTopics.keys.toSeq)(identity[String])
+      if (describeConfigsAuthorized.nonEmpty) {
+        val errorResponse = new StreamsGroupInitializeResponseData()
+        errorResponse.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        errorResponse.setErrorMessage(f"Unauthorized to DESCRIBE_CONFIGS on 
topics ${describeConfigsAuthorized.mkString(",")} are unauthorized.")
+        requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(errorResponse))
+        return CompletableFuture.completedFuture[Unit](())
+      }
+
       groupCoordinator.streamsGroupInitialize(
         request.context,
         streamsGroupInitializeRequest.data,
@@ -3911,7 +3957,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (exception != null) {
           requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(exception))
         } else {
-          requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(response))
+          if (!response.creatableTopics().isEmpty) {
+            // TODO: Once we move this code to the heartbeat, we should 
indicate which topics are being created. We should also find a way to propagate 
failures to the client
+            
autoTopicCreationManager.createStreamsInternalTopics(response.creatableTopics().asScala,
 requestContext);
+          }
+          requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(response.responseData()))
         }
       }
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index d86c450ea0a..12b63489613 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.Node
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
 import org.apache.kafka.common.message.{ApiVersionsResponseData, 
CreateTopicsRequestData}
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicConfig, CreatableTopicConfigCollection}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -44,6 +44,7 @@ import 
org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeT
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.never
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
 
@@ -255,7 +256,7 @@ class AutoTopicCreationManagerTest {
       override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
     }
 
-    val requestContext = initializeRequestContext(topicName, userPrincipal, 
Optional.of(principalSerde))
+    val requestContext = initializeRequestContext(userPrincipal, 
Optional.of(principalSerde))
 
     autoTopicCreationManager.createTopics(
       Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
@@ -274,7 +275,7 @@ class AutoTopicCreationManagerTest {
   def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit 
= {
     val topicName = "topic"
 
-    val requestContext = initializeRequestContext(topicName, 
KafkaPrincipal.ANONYMOUS, Optional.empty())
+    val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, 
Optional.empty())
 
     // Throw upon undefined principal serde when building the forward request
     assertThrows(classOf[IllegalArgumentException], () => 
autoTopicCreationManager.createTopics(
@@ -292,7 +293,7 @@ class AutoTopicCreationManagerTest {
       override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
     }
 
-    val requestContext = initializeRequestContext(topicName, 
KafkaPrincipal.ANONYMOUS, Optional.of(principalSerde))
+    val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, 
Optional.of(principalSerde))
     autoTopicCreationManager.createTopics(
       Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
     autoTopicCreationManager.createTopics(
@@ -322,9 +323,16 @@ class AutoTopicCreationManagerTest {
       argumentCaptor.capture())
   }
 
-  private def initializeRequestContext(topicName: String,
-                                       kafkaPrincipal: KafkaPrincipal,
-                                       principalSerde: 
Optional[KafkaPrincipalSerde]): RequestContext = {
+  @Test
+  def testCreateStreamsInternalTopics(): Unit = {
+    val topicConfig = new CreatableTopicConfigCollection()
+    topicConfig.add(new 
CreatableTopicConfig().setName("cleanup.policy").setValue("compact"));
+
+    val topics = Map(
+      "stream-topic-1" -> new 
CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2).setConfigs(topicConfig),
+      "stream-topic-2" -> new 
CreatableTopic().setName("stream-topic-2").setNumPartitions(1).setReplicationFactor(1)
+    )
+    val requestContext = initializeRequestContextWithUserPrinciple()
 
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
@@ -335,8 +343,183 @@ class AutoTopicCreationManagerTest {
       transactionCoordinator,
       Some(shareCoordinator))
 
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+    Mockito.verify(brokerToController).sendRequest(
+      argumentCaptor.capture(),
+      any(classOf[ControllerRequestCompletionHandler]))
+
+    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, 
"clientId", 0)
+    val capturedRequest = 
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
     val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
-    topicsCollection.add(getNewTopic(topicName))
+    topicsCollection.add(getNewTopic("stream-topic-1", 3, 
2.toShort).setConfigs(topicConfig))
+    topicsCollection.add(getNewTopic("stream-topic-2", 1, 1.toShort))
+    val requestBody = new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTopics(topicsCollection)
+        .setTimeoutMs(requestTimeout))
+      .build(0)
+
+    val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
+    assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer));
+    assertEquals(requestBody.data(), 
CreateTopicsRequest.parse(forwardedRequestBuffer, 0).data())
+  }
+
+  @Test
+  def testCreateStreamsInternalTopicsWhenControllerNotActive(): Unit = {
+    val topics = Map(
+      "stream-topic-1" -> new 
CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2)
+    )
+    val requestContext = initializeRequestContextWithUserPrinciple()
+
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator,
+      Some(shareCoordinator))
+
+    Mockito.when(controller.isActive).thenReturn(false)
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    Mockito.verify(brokerToController).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      any(classOf[ControllerRequestCompletionHandler]))
+  }
+
+  @Test
+  def testFailStreamsInternalTopicsWhenNoChannelManager(): Unit = {
+    val topics = Map(
+      "stream-topic-1" -> new 
CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2)
+    )
+    val requestContext = initializeRequestContextWithUserPrinciple()
+
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      None,
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator,
+      Some(shareCoordinator))
+
+    Mockito.when(controller.isActive).thenReturn(true)
+
+    assertThrows(classOf[IllegalStateException], () => 
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext))
+  }
+
+  @Test
+  def testCreateStreamsInternalTopicsWithEmptyTopics(): Unit = {
+    val topics = Map.empty[String, CreatableTopic]
+    val requestContext = initializeRequestContextWithUserPrinciple()
+
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator,
+      Some(shareCoordinator))
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext)
+
+    Mockito.verify(brokerToController, never()).sendRequest(
+      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
+      any(classOf[ControllerRequestCompletionHandler]))
+  }
+
+  @Test
+  def testCreateStreamsInternalTopicsWithDefaultConfig(): Unit = {
+    val topics = Map(
+      "stream-topic-1" -> new 
CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
+    )
+    val requestContext = initializeRequestContextWithUserPrinciple()
+
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator,
+      Some(shareCoordinator))
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext);
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+    Mockito.verify(brokerToController).sendRequest(
+      argumentCaptor.capture(),
+      any(classOf[ControllerRequestCompletionHandler]))
+
+    val capturedRequest = 
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
+
+    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, 
"clientId", 0)
+    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topicsCollection.add(getNewTopic("stream-topic-1", config.numPartitions, 
config.defaultReplicationFactor.toShort))
+    val requestBody = new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTopics(topicsCollection)
+        .setTimeoutMs(requestTimeout))
+      .build(0)
+    val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
+    assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer));
+    assertEquals(requestBody.data(), 
CreateTopicsRequest.parse(forwardedRequestBuffer, 0).data())
+  }
+
+  @Test
+  def testCreateStreamsInternalTopicsPassesPrinciple(): Unit = {
+    val topics = Map(
+      "stream-topic-1" -> new 
CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
+    )
+    val requestContext = initializeRequestContextWithUserPrinciple()
+
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator,
+      Some(shareCoordinator))
+
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext);
+
+    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
+    Mockito.verify(brokerToController).sendRequest(
+      argumentCaptor.capture(),
+      any(classOf[ControllerRequestCompletionHandler]))
+    val capturedRequest = 
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
+    assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"), 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
+  }
+
+  private def initializeRequestContextWithUserPrinciple(): RequestContext = {
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+    val principalSerde = new KafkaPrincipalSerde {
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
+        Utils.utf8(principal.toString)
+      }
+      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+    }
+    initializeRequestContext(userPrincipal, Optional.of(principalSerde))
+  }
+
+  private def initializeRequestContext(kafkaPrincipal: KafkaPrincipal,
+                                       principalSerde: 
Optional[KafkaPrincipalSerde]): RequestContext = {
+
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      Some(brokerToController),
+      Some(adminManager),
+      Some(controller),
+      groupCoordinator,
+      transactionCoordinator,
+      Some(shareCoordinator))
+
     val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
       .setApiKey(ApiKeys.CREATE_TOPICS.id)
       .setMinVersion(0)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 8d62dadd642..d31e6fb27e7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,6 +77,8 @@ import 
org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
 import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorConfigTest}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.metadata.LeaderAndIsr
@@ -11242,7 +11244,7 @@ class KafkaApisTest extends Logging {
 
     val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, 
true).build())
 
-    val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
+    val future = new CompletableFuture[StreamsGroupInitializeResult]()
     when(groupCoordinator.streamsGroupInitialize(
       requestChannelRequest.context,
       streamsGroupInitializeRequest
@@ -11253,7 +11255,7 @@ class KafkaApisTest extends Logging {
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
-    val streamsGroupInitializeResponse = new 
StreamsGroupInitializeResponseData()
+    val streamsGroupInitializeResponse = new StreamsGroupInitializeResult(new 
StreamsGroupInitializeResponseData())
 
     future.complete(streamsGroupInitializeResponse)
     val response = 
verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest)
@@ -11268,7 +11270,7 @@ class KafkaApisTest extends Logging {
 
     val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, 
true).build())
 
-    val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
+    val future = new CompletableFuture[StreamsGroupInitializeResult]()
     when(groupCoordinator.streamsGroupInitialize(
       requestChannelRequest.context,
       streamsGroupInitializeRequest
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 7c78db21761..07d5d7daaee 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -43,7 +43,6 @@ import 
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -51,6 +50,7 @@ import 
org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 
@@ -93,10 +93,10 @@ public interface GroupCoordinator {
      * @param context           The request context.
      * @param request           The StreamsGroupInitializeRequest data.
      *
-     * @return  A future yielding the response.
+     * @return  A future yielding the result, which contains the response and 
all topics to be created.
      *          The error code(s) of the response are set to indicate the 
error(s) occurred during the execution.
      */
-    CompletableFuture<StreamsGroupInitializeResponseData> 
streamsGroupInitialize(
+    CompletableFuture<StreamsGroupInitializeResult> streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 007e991ad16..3f2f4920162 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -78,6 +78,7 @@ import 
org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSuppli
 import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
 import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.record.BrokerCompressionType;
@@ -354,14 +355,14 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
      * See {@link GroupCoordinator#streamsGroupInitialize(RequestContext, 
org.apache.kafka.common.message.StreamsGroupInitializeRequestData)}.
      */
     @Override
-    public CompletableFuture<StreamsGroupInitializeResponseData> 
streamsGroupInitialize(
+    public CompletableFuture<StreamsGroupInitializeResult> 
streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     ) {
         if (!isActive.get()) {
-            return CompletableFuture.completedFuture(new 
StreamsGroupInitializeResponseData()
+            return CompletableFuture.completedFuture(new 
StreamsGroupInitializeResult(new StreamsGroupInitializeResponseData()
                 .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
-            );
+            ));
         }
 
         return runtime.scheduleWriteOperation(
@@ -373,9 +374,9 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             "streams-group-initialize",
             request,
             exception,
-            (error, message) -> new StreamsGroupInitializeResponseData()
+            (error, message) -> new StreamsGroupInitializeResult(new 
StreamsGroupInitializeResponseData()
                 .setErrorCode(error.code())
-                .setErrorMessage(message),
+                .setErrorMessage(message)),
             log
         ));
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 6a2ad52ec3a..1e67ca0f882 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -43,7 +43,6 @@ import 
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@@ -107,6 +106,7 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.coordinator.group.taskassignor.StickyTaskAssignor;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -398,7 +398,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
      * @return A Result containing the StreamsGroupInitialize response and
      *         a list of records to update the state machine.
      */
-    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(
+    public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     ) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f613d11c8f3..78e433203a4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -38,6 +38,7 @@ import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.ConsumerProtocolSubscription;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -140,8 +141,11 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 import org.apache.kafka.coordinator.group.streams.StreamsTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
 import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -177,7 +181,6 @@ import java.util.stream.Stream;
 import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
-import static org.apache.kafka.common.protocol.Errors.STREAMS_INVALID_TOPOLOGY;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
@@ -2664,7 +2667,7 @@ public class GroupMetadataManager {
      * @param subtopologies The list of subtopologies.
      * @return A Result containing the StreamsGroupInitialize response and a 
list of records to update the state machine.
      */
-    private CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(String groupId,
+    private CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
streamsGroupInitialize(String groupId,
                                                                                
                        String topologyId,
                                                                                
                             
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies)
         throws ApiException {
@@ -2679,49 +2682,31 @@ public class GroupMetadataManager {
         if (!isTopologyInitializationScheduled(groupId, topologyId)) {
             log.warn("No topology to initialize for group ID {} and topology 
ID {} found.", groupId, topologyId);
             StreamsGroupInitializeResponseData response = new 
StreamsGroupInitializeResponseData();
-            return new CoordinatorResult<>(records, response);
-        }
-
-        // TODO: For the POC, only check if internal topics exist
-        Set<String> missingTopics = new HashSet<>();
-        for (StreamsGroupInitializeRequestData.Subtopology subtopology : 
subtopologies) {
-            for (StreamsGroupInitializeRequestData.TopicInfo topic : 
subtopology.stateChangelogTopics()) {
-                if (metadataImage.topics().getTopic(topic.name()) == null) {
-                    missingTopics.add(topic.name());
-                }
-            }
-            for (StreamsGroupInitializeRequestData.TopicInfo topic : 
subtopology.repartitionSourceTopics()) {
-                if (metadataImage.topics().getTopic(topic.name()) == null) {
-                    missingTopics.add(topic.name());
-                }
-            }
+            return new CoordinatorResult<>(records, new 
StreamsGroupInitializeResult(response));
         }
 
         StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(subtopologies);
 
-        cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId);
-
-        if (!missingTopics.isEmpty()) {
-            StreamsGroupInitializeResponseData response =
-                new StreamsGroupInitializeResponseData()
-                    .setErrorCode(STREAMS_INVALID_TOPOLOGY.code())
-                    .setErrorMessage("Internal topics " + String.join(", ", 
missingTopics) + " do not exist.");
+        final Map<String, ConfiguredSubtopology> configuredTopics =
+            InternalTopicManager.configureTopics(logContext, 
recordValue.topology(), metadataImage);
 
-            return new CoordinatorResult<>(records, response);
-        } else {
-            records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
+        cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId);
 
-            final Map<String, StreamsGroupTopologyValue.Subtopology> 
subtopologyMap = recordValue.topology().stream()
-                
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, 
x -> x));
+        records.add(newStreamsGroupTopologyRecord(groupId, recordValue));
 
-            final StreamsTopology topology = new StreamsTopology(topologyId, 
subtopologyMap);
+        final Map<String, StreamsGroupTopologyValue.Subtopology> 
subtopologyMap = recordValue.topology().stream()
+            
.collect(Collectors.toMap(StreamsGroupTopologyValue.Subtopology::subtopologyId, 
x -> x));
+        final StreamsTopology topology = new StreamsTopology(topologyId, 
subtopologyMap);
 
+        final Map<String, CreatableTopic> missingTopics = 
InternalTopicManager.missingTopics(configuredTopics, metadataImage);
+        // TODO: This needs to be sorted out, once we merge heartbeat and 
initialization
+        if (missingTopics.isEmpty()) {
             computeFirstTargetAssignmentAfterTopologyInitialization(group, 
records, topology);
+        }
 
-            StreamsGroupInitializeResponseData response = new 
StreamsGroupInitializeResponseData();
+        StreamsGroupInitializeResponseData response = new 
StreamsGroupInitializeResponseData();
 
-            return new CoordinatorResult<>(records, response);
-        }
+        return new CoordinatorResult<>(records, new 
StreamsGroupInitializeResult(response, missingTopics));
 
     }
 
@@ -4913,7 +4898,7 @@ public class GroupMetadataManager {
      * @return A Result containing the StreamsGroupInitialize response and
      *         a list of records to update the state machine.
      */
-    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(
+    public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
streamsGroupInitialize(
         RequestContext context,
         StreamsGroupInitializeRequestData request
     ) throws ApiException {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java
new file mode 100644
index 00000000000..7a4845993eb
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupInitializeResult.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+public class StreamsGroupInitializeResult {
+
+    private final StreamsGroupInitializeResponseData data;
+    private final Map<String, CreatableTopic> creatableTopics;
+
+    public StreamsGroupInitializeResult(StreamsGroupInitializeResponseData 
data, Map<String, CreatableTopic> creatableTopics) {
+        this.data = data;
+        this.creatableTopics = creatableTopics;
+    }
+
+    public StreamsGroupInitializeResult(StreamsGroupInitializeResponseData 
data) {
+        this.data = data;
+        this.creatableTopics = Collections.emptyMap();
+    }
+
+    public StreamsGroupInitializeResponseData responseData() {
+        return data;
+    }
+
+    public Map<String, CreatableTopic> creatableTopics() {
+        return creatableTopics;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final StreamsGroupInitializeResult that = 
(StreamsGroupInitializeResult) o;
+        return Objects.equals(data, that.data) && 
Objects.equals(creatableTopics,
+            that.creatableTopics);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(data, creatableTopics);
+    }
+
+    @Override
+    public String toString() {
+        return "StreamsGroupInitializeResult{" +
+            "data=" + data +
+            ", creatableTopics=" + creatableTopics +
+            '}';
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 7505aa8c791..6ce91186adf 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -78,6 +78,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.server.record.BrokerCompressionType;
 import org.apache.kafka.server.util.FutureUtils;
 
@@ -280,14 +281,15 @@ public class GroupCoordinatorServiceTest {
         StreamsGroupInitializeRequestData request = new 
StreamsGroupInitializeRequestData()
             .setGroupId("foo");
 
-        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsGroupInitialize(
+        CompletableFuture<StreamsGroupInitializeResult> future = 
service.streamsGroupInitialize(
             requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
             request
         );
 
         assertEquals(
-            new StreamsGroupInitializeResponseData()
-                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+            new StreamsGroupInitializeResult(
+                new StreamsGroupInitializeResponseData()
+                    .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())),
             future.get()
         );
     }
@@ -317,7 +319,7 @@ public class GroupCoordinatorServiceTest {
             new StreamsGroupInitializeResponseData()
         ));
 
-        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsGroupInitialize(
+        CompletableFuture<StreamsGroupInitializeResult> future = 
service.streamsGroupInitialize(
             requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
             request
         );
@@ -368,7 +370,7 @@ public class GroupCoordinatorServiceTest {
             ArgumentMatchers.any()
         )).thenReturn(FutureUtils.failedFuture(exception));
 
-        CompletableFuture<StreamsGroupInitializeResponseData> future = 
service.streamsGroupInitialize(
+        CompletableFuture<StreamsGroupInitializeResult> future = 
service.streamsGroupInitialize(
             requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE),
             request
         );
@@ -377,7 +379,7 @@ public class GroupCoordinatorServiceTest {
             new StreamsGroupInitializeResponseData()
                 .setErrorCode(expectedErrorCode)
                 .setErrorMessage(expectedErrorMessage),
-            future.get(5, TimeUnit.SECONDS)
+            future.get(5, TimeUnit.SECONDS).responseData()
         );
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 03966c07ea8..b836131e667 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -65,6 +65,7 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
@@ -154,9 +155,9 @@ public class GroupCoordinatorShardTest {
 
         RequestContext context = 
requestContext(ApiKeys.STREAMS_GROUP_INITIALIZE);
         StreamsGroupInitializeRequestData request = new 
StreamsGroupInitializeRequestData();
-        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = new CoordinatorResult<>(
+        CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
result = new CoordinatorResult<>(
             Collections.emptyList(),
-            new StreamsGroupInitializeResponseData()
+            new StreamsGroupInitializeResult(new 
StreamsGroupInitializeResponseData())
         );
 
         when(groupMetadataManager.streamsGroupInitialize(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 8ff26e74f35..afdbe390021 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -41,6 +41,9 @@ import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.ConsumerProtocolAssignment;
 import org.apache.kafka.common.message.ConsumerProtocolSubscription;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfigCollection;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -97,6 +100,7 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
 import 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil;
 import org.apache.kafka.image.MetadataDelta;
@@ -567,10 +571,10 @@ public class GroupMetadataManagerTest {
             .setTopologyId(topologyId)
             .setTopology(subtopologies);
 
-        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+        CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
result = context.streamsGroupInitialize(initialize);
 
         assertNotNull(result.response());
-        StreamsGroupInitializeResponseData response = result.response();
+        StreamsGroupInitializeResponseData response = 
result.response().responseData();
         assertEquals(Errors.NONE.code(), response.errorCode());
         List<CoordinatorRecord> coordinatorRecords = result.records();
         assertEquals(5, coordinatorRecords.size());
@@ -685,10 +689,10 @@ public class GroupMetadataManagerTest {
             .setTopologyId(wrongTopologyId)
             .setTopology(subtopologies);
 
-        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+        CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
result = context.streamsGroupInitialize(initialize);
 
         assertNotNull(result.response());
-        StreamsGroupInitializeResponseData response = result.response();
+        StreamsGroupInitializeResponseData response = 
result.response().responseData();
         assertEquals(Errors.NONE.code(), response.errorCode());
         assertTrue(result.records().isEmpty());
     }
@@ -729,10 +733,10 @@ public class GroupMetadataManagerTest {
             .setTopology(subtopologies);
 
         context.streamsGroupInitialize(initialize);
-        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+        CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
result = context.streamsGroupInitialize(initialize);
 
         assertNotNull(result.response());
-        StreamsGroupInitializeResponseData response = result.response();
+        StreamsGroupInitializeResponseData response = 
result.response().responseData();
         assertEquals(Errors.NONE.code(), response.errorCode());
         assertTrue(result.records().isEmpty());
     }
@@ -745,9 +749,12 @@ public class GroupMetadataManagerTest {
         String processId = "process-id";
         Uuid fooTopicId = Uuid.randomUuid();
         String fooTopicName = "repartition";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(fooTopicId, fooTopicName, 4)
+                .addTopic(barTopicId, barTopicName, 4)
                 .addRacks()
                 .build())
             .build();
@@ -776,6 +783,7 @@ public class GroupMetadataManagerTest {
                     Collections.singletonList(
                         new StreamsGroupInitializeRequestData.TopicInfo()
                             .setName("changelog")
+                            .setReplicationFactor((short) 3)
                             .setTopicConfigs(Collections.singletonList(
                                 new 
StreamsGroupInitializeRequestData.TopicConfig()
                                     .setKey("config-name2")
@@ -784,7 +792,7 @@ public class GroupMetadataManagerTest {
                     )
                 )
         );
-        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result =
+        CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
result =
             context.streamsGroupInitialize(
                 new StreamsGroupInitializeRequestData()
                     .setGroupId(groupId)
@@ -792,14 +800,31 @@ public class GroupMetadataManagerTest {
                     .setTopology(topology)
             );
 
+        CreatableTopicConfigCollection expectedConfig = new 
CreatableTopicConfigCollection();
+        expectedConfig.add(
+            new CreatableTopicConfig()
+                .setName("config-name2")
+                .setValue("config-value2")
+        );
+        CreatableTopic expected =
+            new CreatableTopic()
+                .setName("changelog")
+                .setNumPartitions(4)
+                .setReplicationFactor((short) 3)
+                .setConfigs(expectedConfig);
+
         assertEquals(
-            new StreamsGroupInitializeResponseData()
-                .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
-                .setErrorMessage("Internal topics changelog do not exist."),
+            new StreamsGroupInitializeResult(
+                new StreamsGroupInitializeResponseData(),
+                Collections.singletonMap(
+                    "changelog", expected
+                )
+            ),
             result.response()
         );
-        
-        assertTrue(result.records().isEmpty());
+
+        // TODO: Need to check the generated records. Adapt this unit test 
after merging of
+        //       initilization & heartbeat
     }
 
     private StreamsGroupHeartbeatRequestData 
buildFirstStreamsGroupHeartbeatRequest(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 96fd18344c0..08990d9c077 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -40,7 +40,6 @@ import 
org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
-import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.network.ClientInformation;
@@ -111,6 +110,7 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupInitializeResult;
 import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -736,7 +736,7 @@ public class GroupMetadataManagerTestContext {
     }
 
 
-    public CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(
+    public CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
streamsGroupInitialize(
         StreamsGroupInitializeRequestData request
     ) {
         RequestContext context = new RequestContext(
@@ -755,7 +755,7 @@ public class GroupMetadataManagerTestContext {
             false
         );
 
-        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = groupMetadataManager.streamsGroupInitialize(
+        CoordinatorResult<StreamsGroupInitializeResult, CoordinatorRecord> 
result = groupMetadataManager.streamsGroupInitialize(
             context,
             request
         );
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index ab8fb57d3fb..498613ce783 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -127,19 +127,6 @@ public class SmokeTestDriverIntegrationTest extends 
IntegrationTestHarness {
         }
 
         for (final String topic: new String[]{
-            "SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog",
-            "SmokeTest-minStoreName-changelog",
-            "SmokeTest-cntByCnt-repartition",
-            "SmokeTest-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog",
-            "SmokeTest-sum-STATE-STORE-0000000050-changelog",
-            "SmokeTest-uwin-cnt-changelog",
-            "SmokeTest-maxStoreName-changelog",
-            "SmokeTest-cntStoreName-changelog",
-            "SmokeTest-KTABLE-SUPPRESS-STATE-STORE-0000000027-changelog",
-            "SmokeTest-win-sum-changelog",
-            "SmokeTest-uwin-max-changelog",
-            "SmokeTest-uwin-min-changelog",
-            "SmokeTest-cntByCnt-changelog",
             "data",
             "echo",
             "max",
diff --git a/streams/src/test/resources/log4j.properties 
b/streams/src/test/resources/log4j.properties
index 197c8c74210..2f0f3833eb9 100644
--- a/streams/src/test/resources/log4j.properties
+++ b/streams/src/test/resources/log4j.properties
@@ -28,6 +28,8 @@ log4j.logger.org.apache.kafka.clients.consumer=INFO
 log4j.logger.org.apache.kafka.clients.producer=INFO
 log4j.logger.org.apache.kafka.streams=INFO
 log4j.logger.org.apache.kafka.coordinator.group=INFO
+log4j.logger.org.apache.kafka.clients.consumer.internals.StreamsGroupInitializeRequestManager=INFO
+log4j.logger.org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager=INFO
 
 # printing out the configs takes up a huge amount of the allotted characters,
 # and provides little value as we can always figure out the test configs 
without the logs

Reply via email to