This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 427b409cf44 MINOR: Refactor auto topic creation to separate envelope 
logic (#21272)
427b409cf44 is described below

commit 427b409cf440f745ad6195673d3342f6bd3974d4
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Jan 16 09:46:12 2026 +0100

    MINOR: Refactor auto topic creation to separate envelope logic (#21272)
    
    This commit introduces a TopicCreator interface to abstract topic
    creation logic, making the code more modular and testable. The main
    changes include:
    
    - Implemented KRaftTopicCreator for KRaft-based topic creation in Java
    and removed the corresponding logic from DefaultAutoTopicCreationManager
    - Use a mock instance of TopicCreator in AutoTopicCreationManagerTest to
    better test without Mockito's ArgumentCaptor complexity.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../main/java/kafka/server/KRaftTopicCreator.java  | 167 ++++
 core/src/main/java/kafka/server/TopicCreator.java  |  56 ++
 .../kafka/server/AutoTopicCreationManager.scala    | 244 ++----
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 .../java/kafka/server/KRaftTopicCreatorTest.java   | 572 ++++++++++++++
 .../server/AutoTopicCreationManagerTest.scala      | 845 ++++++---------------
 6 files changed, 1090 insertions(+), 798 deletions(-)

diff --git a/core/src/main/java/kafka/server/KRaftTopicCreator.java 
b/core/src/main/java/kafka/server/KRaftTopicCreator.java
new file mode 100644
index 00000000000..6cb0c63cbcb
--- /dev/null
+++ b/core/src/main/java/kafka/server/KRaftTopicCreator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * KRaft implementation of TopicCreator that forwards CreateTopics requests to 
the controller.
+ * When creating topics with a principal, requests are wrapped in an envelope 
to preserve the
+ * original request context for authorization.
+ */
+public class KRaftTopicCreator implements TopicCreator {
+
+    private final NodeToControllerChannelManager channelManager;
+
+    public KRaftTopicCreator(NodeToControllerChannelManager channelManager) {
+        this.channelManager = channelManager;
+    }
+
+    @Override
+    public CompletableFuture<CreateTopicsResponse> createTopicWithPrincipal(
+        RequestContext requestContext,
+        CreateTopicsRequest.Builder createTopicsRequest
+    ) {
+        CompletableFuture<CreateTopicsResponse> responseFuture = new 
CompletableFuture<>();
+
+        short requestVersion = channelManager.controllerApiVersions()
+            .map(v -> v.latestUsableVersion(ApiKeys.CREATE_TOPICS))
+            .orElse(ApiKeys.CREATE_TOPICS.latestVersion());
+
+        RequestHeader requestHeader = new RequestHeader(
+            ApiKeys.CREATE_TOPICS,
+            requestVersion,
+            requestContext.clientId(),
+            requestContext.correlationId()
+        );
+
+        AbstractRequest.Builder<? extends AbstractRequest> envelopeRequest = 
ForwardingManager$.MODULE$.buildEnvelopeRequest(
+            requestContext,
+            createTopicsRequest.build(requestHeader.apiVersion())
+                .serializeWithHeader(requestHeader)
+        );
+
+        ControllerRequestCompletionHandler handler = new 
ControllerRequestCompletionHandler() {
+            @Override
+            public void onTimeout() {
+                responseFuture.completeExceptionally(
+                    new TimeoutException("CreateTopicsRequest to controller 
timed out")
+                );
+            }
+
+            @Override
+            public void onComplete(ClientResponse response) {
+                if (response.authenticationException() != null) {
+                    
responseFuture.completeExceptionally(response.authenticationException());
+                } else if (response.versionMismatch() != null) {
+                    
responseFuture.completeExceptionally(response.versionMismatch());
+                } else if (response.wasDisconnected()) {
+                    responseFuture.completeExceptionally(new 
IOException("Disconnected before receiving CreateTopicsResponse"));
+                } else if (response.hasResponse()) {
+                    if (response.responseBody() instanceof EnvelopeResponse 
envelopeResponse) {
+                        Errors envelopeError = envelopeResponse.error();
+                        if (envelopeError != Errors.NONE) {
+                            
responseFuture.completeExceptionally(envelopeError.exception());
+                        } else {
+                            try {
+                                CreateTopicsResponse createTopicsResponse = 
(CreateTopicsResponse) AbstractResponse.parseResponse(
+                                    envelopeResponse.responseData(),
+                                    requestHeader
+                                );
+                                responseFuture.complete(createTopicsResponse);
+                            } catch (Exception e) {
+                                responseFuture.completeExceptionally(e);
+                            }
+                        }
+                    } else {
+                        responseFuture.completeExceptionally(
+                            new IllegalStateException("Expected 
EnvelopeResponse but got: " +
+                                
response.responseBody().getClass().getSimpleName())
+                        );
+                    }
+                } else {
+                    responseFuture.completeExceptionally(
+                        new IllegalStateException("Got no response body for 
EnvelopeResponse")
+                    );
+                }
+            }
+        };
+
+        channelManager.sendRequest(envelopeRequest, handler);
+        return responseFuture;
+    }
+
+    @Override
+    public CompletableFuture<CreateTopicsResponse> createTopicWithoutPrincipal(
+        CreateTopicsRequest.Builder createTopicsRequest
+    ) {
+        CompletableFuture<CreateTopicsResponse> responseFuture = new 
CompletableFuture<>();
+
+        ControllerRequestCompletionHandler handler = new 
ControllerRequestCompletionHandler() {
+            @Override
+            public void onTimeout() {
+                responseFuture.completeExceptionally(
+                    new TimeoutException("CreateTopicsRequest to controller 
timed out")
+                );
+            }
+
+            @Override
+            public void onComplete(ClientResponse response) {
+                if (response.authenticationException() != null) {
+                    
responseFuture.completeExceptionally(response.authenticationException());
+                } else if (response.versionMismatch() != null) {
+                    
responseFuture.completeExceptionally(response.versionMismatch());
+                } else if (response.wasDisconnected()) {
+                    responseFuture.completeExceptionally(new 
IOException("Disconnected before receiving CreateTopicsResponse"));
+                } else if (response.hasResponse()) {
+                    if (response.responseBody() instanceof 
CreateTopicsResponse createTopicsResponse) {
+                        responseFuture.complete(createTopicsResponse);
+                    } else {
+                        responseFuture.completeExceptionally(
+                            new IllegalStateException("Expected 
CreateTopicsResponse but got: " +
+                                
response.responseBody().getClass().getSimpleName())
+                        );
+                    }
+                } else {
+                    responseFuture.completeExceptionally(
+                        new IllegalStateException("Got no response body for 
CreateTopicsRequest")
+                    );
+                }
+            }
+        };
+
+        channelManager.sendRequest(createTopicsRequest, handler);
+        return responseFuture;
+    }
+
+}
diff --git a/core/src/main/java/kafka/server/TopicCreator.java 
b/core/src/main/java/kafka/server/TopicCreator.java
new file mode 100644
index 00000000000..236de0f0900
--- /dev/null
+++ b/core/src/main/java/kafka/server/TopicCreator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.RequestContext;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction for creating topics via the controller.
+ * Allows different implementations to be used interchangeably
+ * by the AutoTopicCreationManager, enabling better separation of concerns and 
testability.
+ */
+public interface TopicCreator {
+
+    /**
+     * Send a create topics request with principal for user-initiated topic 
creation.
+     * The request context is used to preserve the original client principal 
for auditing.
+     *
+     * @param requestContext      The request context containing the client 
principal.
+     * @param createTopicsRequest The topics to be created.
+     * @return A future of the create topics response. This future will be 
completed on the network thread.
+     */
+    CompletableFuture<CreateTopicsResponse> createTopicWithPrincipal(
+        RequestContext requestContext,
+        CreateTopicsRequest.Builder createTopicsRequest
+    );
+
+    /**
+     * Send a create topics request without principal for internal topic 
creation (e.g., consumer offsets, transaction state).
+     * No request context is required since these are system-initiated 
requests.
+     *
+     * @param createTopicsRequest The topics to be created.
+     * @return A future of the create topics response. This future will be 
completed on the network thread.
+     */
+    CompletableFuture<CreateTopicsResponse> createTopicWithoutPrincipal(
+        CreateTopicsRequest.Builder createTopicsRequest
+    );
+}
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 5ea254fbe7a..f66aa7d3c1a 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -22,25 +22,22 @@ import java.util.concurrent.locks.ReentrantLock
 import java.util.{Collections, Properties}
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.Logging
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{AuthenticationException, 
InvalidTopicException, TimeoutException, UnsupportedVersionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicConfig, CreatableTopicConfigCollection}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractResponse, 
CreateTopicsRequest, CreateTopicsResponse, EnvelopeResponse, RequestContext, 
RequestHeader}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{CreateTopicsRequest, 
CreateTopicsResponse, RequestContext}
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.quota.ControllerMutationQuota
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOptional
 
 trait AutoTopicCreationManager {
 
@@ -134,11 +131,11 @@ private[server] class ExpiringErrorCache(maxSize: Int, 
time: Time) {
 
 class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
-  channelManager: NodeToControllerChannelManager,
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator,
   shareCoordinator: ShareCoordinator,
   time: Time,
+  topicCreator: TopicCreator,
   topicErrorCacheCapacity: Int = 1000
 ) extends AutoTopicCreationManager with Logging {
 
@@ -193,7 +190,7 @@ class DefaultAutoTopicCreationManager(
     }
 
     if (topicsToCreate.nonEmpty) {
-      sendCreateTopicRequestWithErrorCaching(topicsToCreate, 
Some(requestContext), timeoutMs)
+      sendCreateTopicRequestWithErrorCaching(topicsToCreate, requestContext, 
timeoutMs)
     }
   }
 
@@ -208,99 +205,31 @@ class DefaultAutoTopicCreationManager(
     creatableTopics: Map[String, CreatableTopic],
     requestContext: Option[RequestContext]
   ): Seq[MetadataResponseTopic] = {
-    val topicsToCreate = new 
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
-    topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
-
-    val createTopicsRequest = new CreateTopicsRequest.Builder(
-      new CreateTopicsRequestData()
-        .setTimeoutMs(config.requestTimeoutMs)
-        .setTopics(topicsToCreate)
-    )
+    val createTopicsRequest: CreateTopicsRequest.Builder = 
makeCreateTopicsRequestBuilder(creatableTopics)
 
-    // Capture request header information for proper envelope response parsing
-    val requestHeaderForParsing = requestContext.map { context =>
-      val requestVersion =
-        channelManager.controllerApiVersions.toScala match {
-          case None =>
-            ApiKeys.CREATE_TOPICS.latestVersion()
-          case Some(nodeApiVersions) =>
-            nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
-        }
-
-      new RequestHeader(ApiKeys.CREATE_TOPICS,
-        requestVersion,
-        context.clientId,
-        context.correlationId)
+    val responseFuture = requestContext match {
+      case Some(context) => topicCreator.createTopicWithPrincipal(context, 
createTopicsRequest)
+      case None => 
topicCreator.createTopicWithoutPrincipal(createTopicsRequest)
     }
 
-    val requestCompletionHandler = new ControllerRequestCompletionHandler {
-      override def onTimeout(): Unit = {
+    responseFuture.whenComplete {
+      (response, throwable) =>
         clearInflightRequests(creatableTopics)
-        debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
-      }
-
-      override def onComplete(response: ClientResponse): Unit = {
-        clearInflightRequests(creatableTopics)
-        if (response.authenticationException() != null) {
-          warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
authentication exception")
-        } else if (response.versionMismatch() != null) {
-          warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
invalid version exception")
-        } else {
-          if (response.hasResponse) {
-            response.responseBody() match {
-              case envelopeResponse: EnvelopeResponse =>
-                // Unwrap the envelope response to get the actual 
CreateTopicsResponse
-                val envelopeError = envelopeResponse.error()
-                if (envelopeError != Errors.NONE) {
-                  warn(s"Auto topic creation failed for 
${creatableTopics.keys} with envelope error: ${envelopeError}")
-                } else {
-                  requestHeaderForParsing match {
-                    case Some(requestHeader) =>
-                      try {
-                        // Use the captured request header for proper envelope 
response parsing
-                        val createTopicsResponse = 
AbstractResponse.parseResponse(
-                          envelopeResponse.responseData(), 
requestHeader).asInstanceOf[CreateTopicsResponse]
-
-                        
createTopicsResponse.data().topics().forEach(topicResult => {
-                          val error = Errors.forCode(topicResult.errorCode)
-                          if (error != Errors.NONE) {
-                            warn(s"Auto topic creation failed for 
${topicResult.name} with error '${error.name}': ${topicResult.errorMessage}")
-                          }
-                        })
-                      } catch {
-                        case e: Exception =>
-                          warn(s"Failed to parse envelope response for auto 
topic creation of ${creatableTopics.keys}", e)
-                      }
-                    case None =>
-                      warn(s"Cannot parse envelope response without original 
request header information")
-                  }
-                }
-              case createTopicsResponse: CreateTopicsResponse =>
-                createTopicsResponse.data().topics().forEach(topicResult => {
-                  val error = Errors.forCode(topicResult.errorCode)
-                  if (error != Errors.NONE) {
-                    warn(s"Auto topic creation failed for ${topicResult.name} 
with error '${error.name}': ${topicResult.errorMessage}")
-                  }
-                })
-              case other =>
-                warn(s"Auto topic creation request received unexpected 
response type: ${other.getClass.getSimpleName}")
+        // Log any errors from the topic creation attempt
+        if (throwable != null) {
+          logError(creatableTopics, throwable)
+        } else if (response != null) {
+          response.data().topics().forEach(topicResult => {
+            val error = Errors.forCode(topicResult.errorCode)
+            if (error != Errors.NONE) {
+              warn(s"Auto topic creation failed for ${topicResult.name} with 
error '${error.name}': ${topicResult.errorMessage}")
             }
-          }
-          debug(s"Auto topic creation completed for ${creatableTopics.keys} 
with response ${response.responseBody}.")
+          })
+        } else {
+          warn("CreateTopicsResponse future completed with null response and 
no exception")
         }
-      }
-    }
-
-    val request = (requestContext, requestHeaderForParsing) match {
-      case (Some(context), Some(requestHeader)) =>
-        ForwardingManager.buildEnvelopeRequest(context,
-          
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
-      case _ =>
-        createTopicsRequest
     }
 
-    channelManager.sendRequest(request, requestCompletionHandler)
-
     val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
       new MetadataResponseTopic()
         .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
@@ -401,109 +330,54 @@ class DefaultAutoTopicCreationManager(
 
   private def sendCreateTopicRequestWithErrorCaching(
     creatableTopics: Map[String, CreatableTopic],
-    requestContext: Option[RequestContext],
+    requestContext: RequestContext,
     timeoutMs: Long
-  ): Seq[MetadataResponseTopic] = {
-    val topicsToCreate = new 
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
-    topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
-
-    val createTopicsRequest = new CreateTopicsRequest.Builder(
-      new CreateTopicsRequestData()
-        .setTimeoutMs(config.requestTimeoutMs)
-        .setTopics(topicsToCreate)
-    )
-
-    // Capture request header information for proper envelope response parsing
-    val requestHeaderForParsing = requestContext.map { context =>
-      val requestVersion =
-        channelManager.controllerApiVersions.toScala match {
-          case None =>
-            ApiKeys.CREATE_TOPICS.latestVersion()
-          case Some(nodeApiVersions) =>
-            nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
-        }
-
-      new RequestHeader(ApiKeys.CREATE_TOPICS,
-        requestVersion,
-        context.clientId,
-        context.correlationId)
-    }
+  ): Unit = {
+    val createTopicsRequest: CreateTopicsRequest.Builder = 
makeCreateTopicsRequestBuilder(creatableTopics)
 
-    val requestCompletionHandler = new ControllerRequestCompletionHandler {
-      override def onTimeout(): Unit = {
-        clearInflightRequests(creatableTopics)
-        debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
-        cacheTopicCreationErrors(creatableTopics.keys.toSet, "Auto topic 
creation timed out.", timeoutMs)
-      }
+    val createTopicsResponseFuture = 
topicCreator.createTopicWithPrincipal(requestContext, createTopicsRequest)
 
-      override def onComplete(response: ClientResponse): Unit = {
+    createTopicsResponseFuture.whenComplete {
+      (response, throwable) =>
         clearInflightRequests(creatableTopics)
-        if (response.authenticationException() != null) {
-          val authException = response.authenticationException()
-          warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
authentication exception: ${authException.getMessage}")
-          cacheTopicCreationErrors(creatableTopics.keys.toSet, 
authException.getMessage, timeoutMs)
-        } else if (response.versionMismatch() != null) {
-          val versionException = response.versionMismatch()
-          warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
version mismatch exception: ${versionException.getMessage}")
-          cacheTopicCreationErrors(creatableTopics.keys.toSet, 
versionException.getMessage, timeoutMs)
+        // Log any errors from the topic creation attempt
+        if (throwable != null) {
+          logError(creatableTopics, throwable)
+          val errorMessage = 
Option(throwable.getMessage).getOrElse(throwable.toString)
+          cacheTopicCreationErrors(creatableTopics.keys.toSet, errorMessage, 
timeoutMs)
+        } else if (response != null) {
+          debug(s"Auto topic creation completed for ${creatableTopics.keys} 
with response $response.")
+          cacheTopicCreationErrorsFromResponse(response, timeoutMs)
         } else {
-          if (response.hasResponse) {
-            response.responseBody() match {
-              case envelopeResponse: EnvelopeResponse =>
-                // Unwrap the envelope response to get the actual 
CreateTopicsResponse
-                val envelopeError = envelopeResponse.error()
-                if (envelopeError != Errors.NONE) {
-                  warn(s"Auto topic creation failed for 
${creatableTopics.keys} with envelope error: ${envelopeError}")
-                  cacheTopicCreationErrors(creatableTopics.keys.toSet, 
s"Envelope error: ${envelopeError}", timeoutMs)
-                } else {
-                  requestHeaderForParsing match {
-                    case Some(requestHeader) =>
-                      try {
-                        // Use the captured request header for proper envelope 
response parsing
-                        val createTopicsResponse = 
AbstractResponse.parseResponse(
-                          envelopeResponse.responseData(), 
requestHeader).asInstanceOf[CreateTopicsResponse]
-
-                        
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
-                      } catch {
-                        case e: Exception =>
-                          warn(s"Failed to parse envelope response for auto 
topic creation of ${creatableTopics.keys}", e)
-                          cacheTopicCreationErrors(creatableTopics.keys.toSet, 
s"Response parsing error: ${e.getMessage}", timeoutMs)
-                      }
-                    case None =>
-                      warn(s"Cannot parse envelope response without original 
request header information")
-                      cacheTopicCreationErrors(creatableTopics.keys.toSet, 
"Missing request header for envelope parsing", timeoutMs)
-                  }
-                }
-              case createTopicsResponse: CreateTopicsResponse =>
-                cacheTopicCreationErrorsFromResponse(createTopicsResponse, 
timeoutMs)
-              case unexpectedResponse =>
-                warn(s"Auto topic creation request received unexpected 
response type: ${unexpectedResponse.getClass.getSimpleName}")
-                cacheTopicCreationErrors(creatableTopics.keys.toSet, 
s"Unexpected response type: ${unexpectedResponse.getClass.getSimpleName}", 
timeoutMs)
-            }
-            debug(s"Auto topic creation completed for ${creatableTopics.keys} 
with response ${response.responseBody}.")
-          }
+          val ex = new IllegalStateException("CreateTopicsResponse future 
completed with null response and no exception")
+          error(s"Auto topic creation failed for ${creatableTopics.keys} due 
to unexpected future completion state", ex)
+          cacheTopicCreationErrors(creatableTopics.keys.toSet, ex.getMessage, 
timeoutMs)
         }
-      }
     }
+  }
 
-    val request = (requestContext, requestHeaderForParsing) match {
-      case (Some(context), Some(requestHeader)) =>
-        ForwardingManager.buildEnvelopeRequest(context,
-          
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
-      case _ =>
-        createTopicsRequest
+  private def logError(creatableTopics: Map[String, CreatableTopic], 
throwable: Throwable): Unit = {
+    throwable match {
+      case _: TimeoutException =>
+        debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
+      case _: AuthenticationException =>
+        warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
authentication exception")
+      case _: UnsupportedVersionException =>
+        warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
invalid version exception")
+      case other =>
+        warn(s"Auto topic creation failed for ${creatableTopics.keys} with 
exception", other)
     }
+  }
 
-    channelManager.sendRequest(request, requestCompletionHandler)
-
-    val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
-      new MetadataResponseTopic()
-        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-        .setName(topic)
-        .setIsInternal(Topic.isInternal(topic))
-    }
+  private def makeCreateTopicsRequestBuilder(creatableTopics: Map[String, 
CreatableTopic]): CreateTopicsRequest.Builder = {
+    val topicsToCreate = new 
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
+    topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
 
-    creatableTopicResponses
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicsToCreate)
+    )
   }
 
   private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: 
String, ttlMs: Long): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 67ea88b37fb..c173a9aad04 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -392,9 +392,9 @@ class BrokerServer(
         new KafkaScheduler(1, true, "transaction-log-manager-"),
         producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
 
+      val topicCreator = new 
KRaftTopicCreator(clientToControllerChannelManager)
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-        config, clientToControllerChannelManager, groupCoordinator,
-        transactionCoordinator, shareCoordinator, time)
+        config, groupCoordinator, transactionCoordinator, shareCoordinator, 
time, topicCreator)
 
       dynamicConfigHandlers = Map[ConfigType, ConfigHandler](
         ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, 
quotaManagers),
diff --git a/core/src/test/java/kafka/server/KRaftTopicCreatorTest.java 
b/core/src/test/java/kafka/server/KRaftTopicCreatorTest.java
new file mode 100644
index 00000000000..0a749bde44e
--- /dev/null
+++ b/core/src/test/java/kafka/server/KRaftTopicCreatorTest.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestUtils;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class KRaftTopicCreatorTest {
+
+    private static final int REQUEST_TIMEOUT = 100;
+
+    private NodeToControllerChannelManager brokerToController;
+    private KRaftTopicCreator kraftTopicCreator;
+
+    @BeforeEach
+    public void setup() {
+        brokerToController = mock(NodeToControllerChannelManager.class);
+
+        ApiVersionsResponseData.ApiVersion createTopicApiVersion = new 
ApiVersionsResponseData.ApiVersion()
+            .setApiKey(ApiKeys.CREATE_TOPICS.id)
+            .setMinVersion(ApiKeys.CREATE_TOPICS.oldestVersion())
+            .setMaxVersion(ApiKeys.CREATE_TOPICS.latestVersion());
+
+        when(brokerToController.controllerApiVersions())
+            
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))));
+
+        kraftTopicCreator = new KRaftTopicCreator(brokerToController);
+    }
+
+    @Test
+    public void testCreateTopicWithMetadataContextPassPrincipal() throws 
Exception {
+        String topicName = "topic";
+        KafkaPrincipal userPrincipal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user");
+        AtomicBoolean serializeIsCalled = new AtomicBoolean(false);
+
+        KafkaPrincipalSerde principalSerde = new KafkaPrincipalSerde() {
+            @Override
+            public byte[] serialize(KafkaPrincipal principal) {
+                assertEquals(principal, userPrincipal);
+                serializeIsCalled.set(true);
+                return Utils.utf8(principal.toString());
+            }
+
+            @Override
+            public KafkaPrincipal deserialize(byte[] bytes) {
+                return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+            }
+        };
+
+        RequestContext requestContext = 
initializeRequestContext(userPrincipal, Optional.of(principalSerde));
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        assertTrue(serializeIsCalled.get());
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<AbstractRequest.Builder<? extends AbstractRequest>> 
argumentCaptor =
+            (ArgumentCaptor<AbstractRequest.Builder<? extends 
AbstractRequest>>) (ArgumentCaptor<?>) 
ArgumentCaptor.forClass(AbstractRequest.Builder.class);
+        verify(brokerToController).sendRequest(
+            argumentCaptor.capture(),
+            any(ControllerRequestCompletionHandler.class));
+
+        EnvelopeRequest capturedRequest = (EnvelopeRequest) 
argumentCaptor.getValue()
+            .build(ApiKeys.ENVELOPE.latestVersion());
+        assertEquals(userPrincipal, 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal())));
+    }
+
+    @Test
+    public void 
testCreateTopicWithMetadataContextWhenPrincipalSerdeNotDefined() {
+        String topicName = "topic";
+        RequestContext requestContext = 
initializeRequestContext(KafkaPrincipal.ANONYMOUS, Optional.empty());
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        assertThrows(IllegalArgumentException.class,
+            () -> kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest));
+    }
+
+    @Test
+    public void testCreateTopicWithoutRequestContext() {
+        String topicName = "topic";
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        kraftTopicCreator.createTopicWithoutPrincipal(createTopicsRequest);
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<AbstractRequest.Builder<? extends AbstractRequest>> 
argumentCaptor =
+            (ArgumentCaptor<AbstractRequest.Builder<? extends 
AbstractRequest>>) (ArgumentCaptor<?>) 
ArgumentCaptor.forClass(AbstractRequest.Builder.class);
+        verify(brokerToController).sendRequest(
+            argumentCaptor.capture(),
+            any(ControllerRequestCompletionHandler.class));
+
+        AbstractRequest.Builder<?> capturedRequest = argumentCaptor.getValue();
+        assertInstanceOf(CreateTopicsRequest.Builder.class, capturedRequest,
+            "Should send CreateTopicsRequest.Builder when no request context 
provided");
+    }
+
+    @Test
+    public void testEnvelopeResponseSuccessfulParsing() throws Exception {
+        String topicName = "test-topic";
+        RequestContext requestContext = 
initializeRequestContextWithUserPrincipal();
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        CreateTopicsResponseData createTopicsResponseData = new 
CreateTopicsResponseData();
+        CreateTopicsResponseData.CreatableTopicResult topicResult =
+            new CreateTopicsResponseData.CreatableTopicResult()
+                .setName(topicName)
+                .setErrorCode(Errors.NONE.code())
+                .setNumPartitions(1)
+                .setReplicationFactor((short) 1);
+        createTopicsResponseData.topics().add(topicResult);
+
+        CreateTopicsResponse createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData);
+        short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion();
+        int correlationId = requestContext.correlationId();
+        String clientId = requestContext.clientId();
+
+        ResponseHeader responseHeader = new ResponseHeader(
+            correlationId,
+            ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion)
+        );
+        ByteBuffer serializedResponse = RequestUtils.serialize(
+            responseHeader.data(),
+            responseHeader.headerVersion(),
+            createTopicsResponse.data(),
+            requestVersion
+        );
+
+        EnvelopeResponse envelopeResponse = new 
EnvelopeResponse(serializedResponse, Errors.NONE);
+        RequestHeader requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 
(short) 0, clientId, correlationId);
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, null, null, 
envelopeResponse
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+
+        CreateTopicsResponse result = responseFuture.get();
+        assertEquals(1, result.data().topics().size());
+        assertEquals(topicName, 
result.data().topics().iterator().next().name());
+        assertEquals(Errors.NONE.code(), 
result.data().topics().iterator().next().errorCode());
+    }
+
+    @Test
+    public void testEnvelopeResponseWithEnvelopeError() {
+        String topicName = "test-topic";
+        RequestContext requestContext = 
initializeRequestContextWithUserPrincipal();
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        EnvelopeResponse envelopeResponse = new 
EnvelopeResponse(ByteBuffer.allocate(0), Errors.UNSUPPORTED_VERSION);
+        RequestHeader requestHeader = new RequestHeader(
+            ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(), 
requestContext.correlationId()
+        );
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, null, null, 
envelopeResponse
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+
+        assertThrows(ExecutionException.class, responseFuture::get);
+        assertTrue(responseFuture.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testEnvelopeResponseParsingException() {
+        String topicName = "test-topic";
+        RequestContext requestContext = 
initializeRequestContextWithUserPrincipal();
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        ByteBuffer malformedData = ByteBuffer.wrap("invalid response 
data".getBytes());
+        EnvelopeResponse envelopeResponse = new 
EnvelopeResponse(malformedData, Errors.NONE);
+        RequestHeader requestHeader = new RequestHeader(
+            ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(), 
requestContext.correlationId()
+        );
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, null, null, 
envelopeResponse
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+        assertTrue(responseFuture.isCompletedExceptionally());
+        ExecutionException exception = assertThrows(ExecutionException.class, 
responseFuture::get);
+        assertInstanceOf(RuntimeException.class, exception.getCause());
+    }
+
+    @Test
+    public void testEnvelopeResponseWithTopicErrors() throws Exception {
+        String topicName1 = "test-topic-1";
+        String topicName2 = "test-topic-2";
+        RequestContext requestContext = 
initializeRequestContextWithUserPrincipal();
+
+        CreateTopicsRequestData.CreatableTopicCollection topicsCollection =
+            new CreateTopicsRequestData.CreatableTopicCollection();
+        topicsCollection.add(
+            new CreateTopicsRequestData.CreatableTopic()
+                .setName(topicName1)
+                .setNumPartitions(1)
+                .setReplicationFactor((short) 1)
+        );
+        topicsCollection.add(
+            new CreateTopicsRequestData.CreatableTopic()
+                .setName(topicName2)
+                .setNumPartitions(1)
+                .setReplicationFactor((short) 1)
+        );
+        CreateTopicsRequest.Builder createTopicsRequest = new 
CreateTopicsRequest.Builder(
+            new CreateTopicsRequestData()
+                .setTopics(topicsCollection)
+                .setTimeoutMs(REQUEST_TIMEOUT)
+        );
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        CreateTopicsResponseData createTopicsResponseData = new 
CreateTopicsResponseData();
+
+        CreateTopicsResponseData.CreatableTopicResult successResult =
+            new CreateTopicsResponseData.CreatableTopicResult()
+                .setName(topicName1)
+                .setErrorCode(Errors.NONE.code())
+                .setNumPartitions(1)
+                .setReplicationFactor((short) 1);
+        createTopicsResponseData.topics().add(successResult);
+
+        CreateTopicsResponseData.CreatableTopicResult errorResult =
+            new CreateTopicsResponseData.CreatableTopicResult()
+                .setName(topicName2)
+                .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+                .setErrorMessage("Topic already exists");
+        createTopicsResponseData.topics().add(errorResult);
+
+        CreateTopicsResponse createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData);
+        short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion();
+        int correlationId = requestContext.correlationId();
+        String clientId = requestContext.clientId();
+
+        ResponseHeader responseHeader = new ResponseHeader(
+            correlationId,
+            ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion)
+        );
+        ByteBuffer serializedResponse = RequestUtils.serialize(
+            responseHeader.data(),
+            responseHeader.headerVersion(),
+            createTopicsResponse.data(),
+            requestVersion
+        );
+
+        EnvelopeResponse envelopeResponse = new 
EnvelopeResponse(serializedResponse, Errors.NONE);
+        RequestHeader requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 
(short) 0, clientId, correlationId);
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, null, null, 
envelopeResponse
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+
+        CreateTopicsResponse result = responseFuture.get();
+        assertEquals(2, result.data().topics().size());
+        Map<String, CreateTopicsResponseData.CreatableTopicResult> results = 
result.data().topics().stream()
+            .collect(Collectors.toMap(
+                CreateTopicsResponseData.CreatableTopicResult::name,
+                t -> t
+            ));
+        assertEquals(Errors.NONE.code(), results.get(topicName1).errorCode());
+        assertEquals(Errors.TOPIC_ALREADY_EXISTS.code(), 
results.get(topicName2).errorCode());
+        assertEquals("Topic already exists", 
results.get(topicName2).errorMessage());
+    }
+
+    @Test
+    public void testTimeoutException() {
+        String topicName = "test-topic";
+        RequestContext requestContext = 
initializeRequestContextWithUserPrincipal();
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        argumentCaptor.getValue().onTimeout();
+
+        ExecutionException exception = assertThrows(ExecutionException.class, 
responseFuture::get);
+        assertInstanceOf(TimeoutException.class, exception.getCause());
+        assertTrue(responseFuture.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testAuthenticationException() {
+        String topicName = "test-topic";
+        RequestContext requestContext = 
initializeRequestContextWithUserPrincipal();
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        RequestHeader requestHeader = new RequestHeader(
+            ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(), 
requestContext.correlationId()
+        );
+        AuthenticationException authException = new 
AuthenticationException("Authentication failed");
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, null, authException, null
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+
+        ExecutionException exception = assertThrows(ExecutionException.class, 
responseFuture::get);
+        assertInstanceOf(AuthenticationException.class, exception.getCause());
+        assertTrue(responseFuture.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testVersionMismatchException() {
+        String topicName = "test-topic";
+        RequestContext requestContext = 
initializeRequestContextWithUserPrincipal();
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithPrincipal(requestContext, 
createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        RequestHeader requestHeader = new RequestHeader(
+            ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(), 
requestContext.correlationId()
+        );
+        UnsupportedVersionException versionMismatch = new 
UnsupportedVersionException("Version mismatch");
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, versionMismatch, null, null
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+
+        ExecutionException exception = assertThrows(ExecutionException.class, 
responseFuture::get);
+        assertInstanceOf(UnsupportedVersionException.class, 
exception.getCause());
+        assertTrue(responseFuture.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testDirectCreateTopicsResponse() throws Exception {
+        String topicName = "test-topic";
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithoutPrincipal(createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        CreateTopicsResponseData createTopicsResponseData = new 
CreateTopicsResponseData();
+        CreateTopicsResponseData.CreatableTopicResult topicResult =
+            new CreateTopicsResponseData.CreatableTopicResult()
+                .setName(topicName)
+                .setErrorCode(Errors.NONE.code())
+                .setNumPartitions(1)
+                .setReplicationFactor((short) 1);
+        createTopicsResponseData.topics().add(topicResult);
+
+        CreateTopicsResponse createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData);
+        RequestHeader requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 
(short) 0, "client", 1);
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, null, null, 
createTopicsResponse
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+
+        CreateTopicsResponse result = responseFuture.get();
+        assertEquals(1, result.data().topics().size());
+        assertEquals(topicName, 
result.data().topics().iterator().next().name());
+    }
+
+    @Test
+    public void testUnexpectedResponseType() {
+        String topicName = "test-topic";
+        CreateTopicsRequest.Builder createTopicsRequest = 
createCreateTopicsRequestBuilder(topicName);
+
+        CompletableFuture<CreateTopicsResponse> responseFuture =
+            kraftTopicCreator.createTopicWithoutPrincipal(createTopicsRequest);
+
+        ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+            ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+        verify(brokerToController).sendRequest(
+            any(),
+            argumentCaptor.capture());
+
+        MetadataResponse unexpectedResponse = new MetadataResponse(
+            new MetadataResponseData(),
+            ApiKeys.METADATA.latestVersion()
+        );
+        RequestHeader requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 
(short) 0, "client", 1);
+        ClientResponse clientResponse = new ClientResponse(
+            requestHeader, null, null, 0, 0, false, null, null, 
unexpectedResponse
+        );
+
+        argumentCaptor.getValue().onComplete(clientResponse);
+
+        ExecutionException exception = assertThrows(ExecutionException.class, 
responseFuture::get);
+        assertInstanceOf(IllegalStateException.class, exception.getCause());
+        assertTrue(responseFuture.isCompletedExceptionally());
+    }
+
+    private RequestContext initializeRequestContextWithUserPrincipal() {
+        KafkaPrincipal userPrincipal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user");
+        KafkaPrincipalSerde principalSerde = new KafkaPrincipalSerde() {
+            @Override
+            public byte[] serialize(KafkaPrincipal principal) {
+                return Utils.utf8(principal.toString());
+            }
+
+            @Override
+            public KafkaPrincipal deserialize(byte[] bytes) {
+                return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+            }
+        };
+        return initializeRequestContext(userPrincipal, 
Optional.of(principalSerde));
+    }
+
+    private RequestContext initializeRequestContext(
+        KafkaPrincipal kafkaPrincipal,
+        Optional<KafkaPrincipalSerde> principalSerde
+    ) {
+        try {
+            RequestHeader requestHeader = new RequestHeader(
+                ApiKeys.METADATA,
+                ApiKeys.METADATA.latestVersion(),
+                "clientId",
+                0
+            );
+            return new RequestContext(
+                requestHeader,
+                "1",
+                InetAddress.getLocalHost(),
+                Optional.empty(),
+                kafkaPrincipal,
+                ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+                SecurityProtocol.PLAINTEXT,
+                ClientInformation.EMPTY,
+                false,
+                principalSerde
+            );
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private CreateTopicsRequest.Builder 
createCreateTopicsRequestBuilder(String topicName) {
+        CreateTopicsRequestData.CreatableTopicCollection topicsCollection =
+            new CreateTopicsRequestData.CreatableTopicCollection();
+        topicsCollection.add(
+            new CreateTopicsRequestData.CreatableTopic()
+                .setName(topicName)
+                .setNumPartitions(1)
+                .setReplicationFactor((short) 1)
+        );
+        return new CreateTopicsRequest.Builder(
+            new CreateTopicsRequestData()
+                .setTopics(topicsCollection)
+                .setTimeoutMs(REQUEST_TIMEOUT)
+        );
+    }
+}
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index eee53de824f..1266176f63a 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -18,22 +18,20 @@
 package kafka.server
 
 import java.net.InetAddress
-import java.nio.ByteBuffer
 import java.util
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.{Collections, Optional, Properties}
+import java.util.concurrent.CompletableFuture
+import java.util.{Optional, Properties}
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.{ClientResponse, NodeApiVersions, 
RequestCompletionHandler}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
-import org.apache.kafka.common.message.{ApiVersionsResponseData, 
CreateTopicsRequestData}
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicConfig, CreatableTopicConfigCollection}
+import org.apache.kafka.common.message.CreateTopicsResponseData
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor, Errors}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.requests.RequestUtils
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
 import org.apache.kafka.server.util.MockTime
@@ -42,16 +40,68 @@ import 
org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorCon
 import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.server.config.ServerConfigs
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.quota.ControllerMutationQuota
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
-import org.mockito.ArgumentMatchers.any
-import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-import org.mockito.Mockito.never
+import org.mockito.{ArgumentMatchers, Mockito}
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Test implementation of TopicCreator that tracks method calls and allows 
configuring responses.
+ */
+class TestTopicCreator extends TopicCreator {
+  private val withPrincipalCalls = ListBuffer[(RequestContext, 
CreateTopicsRequest.Builder)]()
+  private val withoutPrincipalCalls = ListBuffer[CreateTopicsRequest.Builder]()
+  private var withPrincipalResponse: CompletableFuture[CreateTopicsResponse] = 
_
+  private var withoutPrincipalResponse: 
CompletableFuture[CreateTopicsResponse] = _
+
+  override def createTopicWithPrincipal(
+    requestContext: RequestContext,
+    request: CreateTopicsRequest.Builder
+  ): CompletableFuture[CreateTopicsResponse] = {
+    withPrincipalCalls += ((requestContext, request))
+    if (withPrincipalResponse != null) withPrincipalResponse else 
CompletableFuture.completedFuture(null)
+  }
+
+  override def createTopicWithoutPrincipal(
+    request: CreateTopicsRequest.Builder
+  ): CompletableFuture[CreateTopicsResponse] = {
+    withoutPrincipalCalls += request
+    if (withoutPrincipalResponse != null) withoutPrincipalResponse else 
CompletableFuture.completedFuture(null)
+  }
+
+  def setResponseForWithPrincipal(response: CreateTopicsResponse): Unit = {
+    withPrincipalResponse = CompletableFuture.completedFuture(response)
+  }
+
+  def setResponseForWithoutPrincipal(response: CreateTopicsResponse): Unit = {
+    withoutPrincipalResponse = CompletableFuture.completedFuture(response)
+  }
+
+  def setFutureForWithPrincipal(future: 
CompletableFuture[CreateTopicsResponse]): Unit = {
+    withPrincipalResponse = future
+  }
+
+  def setFutureForWithoutPrincipal(future: 
CompletableFuture[CreateTopicsResponse]): Unit = {
+    withoutPrincipalResponse = future
+  }
+
+  def getWithPrincipalCalls: List[(RequestContext, 
CreateTopicsRequest.Builder)] = withPrincipalCalls.toList
+  def getWithoutPrincipalCalls: List[CreateTopicsRequest.Builder] = 
withoutPrincipalCalls.toList
+
+  def withPrincipalCallCount: Int = withPrincipalCalls.size
+  def withoutPrincipalCallCount: Int = withoutPrincipalCalls.size
+
+  def reset(): Unit = {
+    withPrincipalCalls.clear()
+    withoutPrincipalCalls.clear()
+    withPrincipalResponse = null
+    withoutPrincipalResponse = null
+  }
+}
 
 class AutoTopicCreationManagerTest {
 
@@ -59,7 +109,7 @@ class AutoTopicCreationManagerTest {
   private val testCacheCapacity = 3
   private var config: KafkaConfig = _
   private val metadataCache = Mockito.mock(classOf[MetadataCache])
-  private val brokerToController = 
Mockito.mock(classOf[NodeToControllerChannelManager])
+  private val topicCreator = new TestTopicCreator()
   private val groupCoordinator = Mockito.mock(classOf[GroupCoordinator])
   private val transactionCoordinator = 
Mockito.mock(classOf[TransactionCoordinator])
   private val shareCoordinator = Mockito.mock(classOf[ShareCoordinator])
@@ -81,13 +131,12 @@ class AutoTopicCreationManagerTest {
     props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
     
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
     
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, 
internalTopicReplicationFactor.toString)
-    // Set a short group max session timeout for testing TTL (1 second)
-    
props.setProperty(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 
"1000")
 
     config = KafkaConfig.fromProps(props)
     val aliveBrokers = util.List.of(new Node(0, "host0", 0), new Node(1, 
"host1", 1))
 
-    
Mockito.when(metadataCache.getAliveBrokerNodes(any(classOf[ListenerName]))).thenReturn(aliveBrokers)
+    
Mockito.when(metadataCache.getAliveBrokerNodes(ArgumentMatchers.any(classOf[ListenerName]))).thenReturn(aliveBrokers)
+    topicCreator.reset()
   }
 
   @Test
@@ -119,109 +168,86 @@ class AutoTopicCreationManagerTest {
                               replicationFactor: Short = 1): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
+      topicCreator,
       topicErrorCacheCapacity = testCacheCapacity)
 
-    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
-    topicsCollection.add(getNewTopic(topicName, numPartitions, 
replicationFactor))
-    val requestBody = new CreateTopicsRequest.Builder(
-      new CreateTopicsRequestData()
-        .setTopics(topicsCollection)
-        .setTimeoutMs(requestTimeout))
+    // Set up the topicCreator to return a successful response
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    val topicResult = new CreatableTopicResult()
+      .setName(topicName)
+      .setErrorCode(Errors.NONE.code())
+    createTopicsResponseData.topics().add(topicResult)
+    val response = new CreateTopicsResponse(createTopicsResponseData)
+    topicCreator.setResponseForWithoutPrincipal(response)
 
-    // Calling twice with the same topic will only trigger one forwarding.
-    createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName, 
isInternal)
+    // First call to create topic - should trigger the topic creator
     createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName, 
isInternal)
 
-    Mockito.verify(brokerToController).sendRequest(
-      ArgumentMatchers.eq(requestBody),
-      any(classOf[ControllerRequestCompletionHandler]))
-  }
+    assertEquals(1, topicCreator.withoutPrincipalCallCount, "Should have 
called createTopicWithoutPrincipal once")
 
-  @Test
-  def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
-    val topicName = "topic"
-
-    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
-    val serializeIsCalled = new AtomicBoolean(false)
-    val principalSerde = new KafkaPrincipalSerde {
-      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
-        assertEquals(principal, userPrincipal)
-        serializeIsCalled.set(true)
-        Utils.utf8(principal.toString)
-      }
-      override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
-    }
+    // Reset the topicCreator to verify the second call
+    topicCreator.reset()
+    topicCreator.setResponseForWithoutPrincipal(response)
 
-    val requestContext = initializeRequestContext(userPrincipal, 
Optional.of(principalSerde))
+    // Second call - should also trigger topicCreator because inflight is 
cleared after first call completes
+    createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName, 
isInternal)
 
-    autoTopicCreationManager.createTopics(
-      Set(topicName), 
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA, 
Some(requestContext))
+    assertEquals(1, topicCreator.withoutPrincipalCallCount, "Should have 
called createTopicWithoutPrincipal once more")
 
-    assertTrue(serializeIsCalled.get())
+    // Verify the request builder matches expected values
+    val capturedRequest = topicCreator.getWithoutPrincipalCalls.head.build()
+    assertEquals(requestTimeout, capturedRequest.data().timeoutMs())
+    assertEquals(1, capturedRequest.data().topics().size())
 
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
-    Mockito.verify(brokerToController).sendRequest(
-      argumentCaptor.capture(),
-      any(classOf[ControllerRequestCompletionHandler]))
-    val capturedRequest = 
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
-    assertEquals(userPrincipal, 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
+    // Validate request
+    val topic = capturedRequest.data().topics().iterator().next()
+    assertEquals(topicName, topic.name())
+    assertEquals(numPartitions, topic.numPartitions())
+    assertEquals(replicationFactor, topic.replicationFactor())
   }
 
   @Test
-  def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit 
= {
-    val topicName = "topic"
-
-    val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, 
Optional.empty())
-
-    // Throw upon undefined principal serde when building the forward request
-    assertThrows(classOf[IllegalArgumentException], () => 
autoTopicCreationManager.createTopics(
-      Set(topicName), 
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA, 
Some(requestContext)))
-  }
+  def testTopicCreationWithMetadataContext(): Unit = {
+    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+      config,
+      groupCoordinator,
+      transactionCoordinator,
+      shareCoordinator,
+      mockTime,
+      topicCreator,
+      topicErrorCacheCapacity = testCacheCapacity)
 
-  @Test
-  def testTopicCreationWithMetadataContextNoRetryUponUnsupportedVersion(): 
Unit = {
     val topicName = "topic"
-
+    val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
     val principalSerde = new KafkaPrincipalSerde {
-      override def serialize(principal: KafkaPrincipal): Array[Byte] = {
-        Utils.utf8(principal.toString)
-      }
+      override def serialize(principal: KafkaPrincipal): Array[Byte] = 
Utils.utf8(principal.toString)
       override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
     }
 
-    val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, 
Optional.of(principalSerde))
-    autoTopicCreationManager.createTopics(
-      Set(topicName), 
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA, 
Some(requestContext))
-    autoTopicCreationManager.createTopics(
-      Set(topicName), 
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA, 
Some(requestContext))
+    val requestContext = initializeRequestContext(userPrincipal, 
Optional.of(principalSerde))
+
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    val topicResult = new CreatableTopicResult()
+      .setName(topicName)
+      .setErrorCode(Errors.NONE.code())
+    createTopicsResponseData.topics().add(topicResult)
+    val response = new CreateTopicsResponse(createTopicsResponseData)
+    topicCreator.setResponseForWithPrincipal(response)
 
-    // Should only trigger once
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Complete with unsupported version will not trigger a retry, but cleanup 
the inflight topics instead
-    val header = new RequestHeader(ApiKeys.ENVELOPE, 0, "client", 1)
-    val response = new EnvelopeResponse(ByteBuffer.allocate(0), 
Errors.UNSUPPORTED_VERSION)
-    val clientResponse = new ClientResponse(header, null, null,
-      0, 0, false, null, null, response)
-    
argumentCaptor.getValue.asInstanceOf[RequestCompletionHandler].onComplete(clientResponse)
-    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Could do the send again as inflight topics are cleared.
     autoTopicCreationManager.createTopics(
       Set(topicName), 
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA, 
Some(requestContext))
-    Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
+
+    assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal once")
+    val calls = topicCreator.getWithPrincipalCalls
+    assertEquals(requestContext, calls.head._1)
+
+    val capturedRequest = calls.head._2.build()
+    assertEquals(1, capturedRequest.data().topics().size())
+    assertEquals(topicName, 
capturedRequest.data().topics().iterator().next().name())
   }
 
   @Test
@@ -237,35 +263,38 @@ class AutoTopicCreationManagerTest {
 
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
+      topicCreator,
+      topicErrorCacheCapacity = testCacheCapacity
+    )
+
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new CreatableTopicResult()
+        .setName("stream-topic-1")
+        .setErrorCode(Errors.NONE.code()))
+    createTopicsResponseData.topics().add(
+      new CreatableTopicResult()
+        .setName("stream-topic-2")
+        .setErrorCode(Errors.NONE.code()))
+    val response = new CreateTopicsResponse(createTopicsResponseData)
+    topicCreator.setResponseForWithPrincipal(response)
 
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
 
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
-    Mockito.verify(brokerToController).sendRequest(
-      argumentCaptor.capture(),
-      any(classOf[ControllerRequestCompletionHandler]))
-
-    val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, 
ApiKeys.CREATE_TOPICS.latestVersion(), "clientId", 0)
-    val capturedRequest = 
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
-    val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
-    topicsCollection.add(getNewTopic("stream-topic-1", 3, 
2.toShort).setConfigs(topicConfig))
-    topicsCollection.add(getNewTopic("stream-topic-2", 1, 1.toShort))
-    val requestBody = new CreateTopicsRequest.Builder(
-      new CreateTopicsRequestData()
-        .setTopics(topicsCollection)
-        .setTimeoutMs(requestTimeout))
-      .build(ApiKeys.CREATE_TOPICS.latestVersion())
-
-    val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
-    assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer))
-    assertEquals(requestBody.data(), CreateTopicsRequest.parse(new 
ByteBufferAccessor(forwardedRequestBuffer),
-      ApiKeys.CREATE_TOPICS.latestVersion()).data())
+    assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal once")
+    val calls = topicCreator.getWithPrincipalCalls
+    assertEquals(requestContext, calls.head._1)
+
+    val capturedRequest = calls.head._2.build()
+    assertEquals(requestTimeout, capturedRequest.data().timeoutMs())
+    assertEquals(2, capturedRequest.data().topics().size())
+    val topicNames = 
capturedRequest.data().topics().asScala.map(_.name()).toSet
+    assertTrue(topicNames.contains("stream-topic-1"))
+    assertTrue(topicNames.contains("stream-topic-2"))
   }
 
   @Test
@@ -275,22 +304,20 @@ class AutoTopicCreationManagerTest {
 
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
+      topicCreator,
       topicErrorCacheCapacity = testCacheCapacity)
 
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
 
-    Mockito.verify(brokerToController, never()).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      any(classOf[ControllerRequestCompletionHandler]))
+    assertEquals(0, topicCreator.withPrincipalCallCount, "Should not have 
called createTopicWithPrincipal")
   }
 
   @Test
-  def testCreateStreamsInternalTopicsPassesPrincipal(): Unit = {
+  def testCreateStreamsInternalTopicsPassesRequestContext(): Unit = {
     val topics = Map(
       "stream-topic-1" -> new 
CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
     )
@@ -298,21 +325,27 @@ class AutoTopicCreationManagerTest {
 
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
+      topicCreator,
+      topicErrorCacheCapacity = testCacheCapacity
+    )
+
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    createTopicsResponseData.topics().add(
+      new CreatableTopicResult()
+        .setName("stream-topic-1")
+        .setErrorCode(Errors.NONE.code()))
+    val response = new CreateTopicsResponse(createTopicsResponseData)
+    topicCreator.setResponseForWithPrincipal(response)
 
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
 
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
-    Mockito.verify(brokerToController).sendRequest(
-      argumentCaptor.capture(),
-      any(classOf[ControllerRequestCompletionHandler]))
-    val capturedRequest = 
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
-    assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"), 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
+    assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal once")
+    val calls = topicCreator.getWithPrincipalCalls
+    assertEquals(requestContext, calls.head._1)
   }
 
   private def initializeRequestContextWithUserPrincipal(): RequestContext = {
@@ -328,23 +361,6 @@ class AutoTopicCreationManagerTest {
 
   private def initializeRequestContext(kafkaPrincipal: KafkaPrincipal,
                                        principalSerde: 
Optional[KafkaPrincipalSerde]): RequestContext = {
-
-    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-      config,
-      brokerToController,
-      groupCoordinator,
-      transactionCoordinator,
-      shareCoordinator,
-      mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
-    val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
-      .setApiKey(ApiKeys.CREATE_TOPICS.id)
-      .setMinVersion(ApiKeys.CREATE_TOPICS.oldestVersion())
-      .setMaxVersion(ApiKeys.CREATE_TOPICS.latestVersion())
-    Mockito.when(brokerToController.controllerApiVersions())
-      
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
-
     val requestHeader = new RequestHeader(ApiKeys.METADATA, 
ApiKeys.METADATA.latestVersion,
       "clientId", 0)
     new RequestContext(requestHeader, "1", InetAddress.getLocalHost, 
Optional.empty(),
@@ -367,51 +383,35 @@ class AutoTopicCreationManagerTest {
     assertEquals(expectedResponses, topicResponses)
   }
 
-  private def getNewTopic(topicName: String, numPartitions: Int, 
replicationFactor: Short): CreatableTopic = {
-    new CreatableTopic()
-      .setName(topicName)
-      .setNumPartitions(numPartitions)
-      .setReplicationFactor(replicationFactor)
-  }
-
   @Test
   def testTopicCreationErrorCaching(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
+      topicCreator,
+      topicErrorCacheCapacity = testCacheCapacity
+    )
 
     val topics = Map(
       "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
     )
     val requestContext = initializeRequestContextWithUserPrincipal()
 
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
     // Simulate a CreateTopicsResponse with errors
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    val topicResult = new CreatableTopicResult()
       .setName("test-topic-1")
       .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
       .setErrorMessage("Topic 'test-topic-1' already exists.")
     createTopicsResponseData.topics().add(topicResult)
 
     val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
-    val clientResponse = new ClientResponse(header, null, null,
-      0, 0, false, null, null, createTopicsResponse)
+    topicCreator.setResponseForWithPrincipal(createTopicsResponse)
 
-    // Trigger the completion handler
-    argumentCaptor.getValue.onComplete(clientResponse)
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
 
     // Verify that the error was cached
     val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic-1"),
 mockTime.milliseconds())
@@ -424,45 +424,38 @@ class AutoTopicCreationManagerTest {
   def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
+      topicCreator,
+      topicErrorCacheCapacity = testCacheCapacity
+    )
 
     val topics = Map(
       "success-topic" -> new 
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
       "failed-topic" -> new 
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
     )
     val requestContext = initializeRequestContextWithUserPrincipal()
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
 
     // Simulate mixed response - one success, one failure
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
+    val createTopicsResponseData = new CreateTopicsResponseData()
     createTopicsResponseData.topics().add(
-      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      new CreatableTopicResult()
         .setName("success-topic")
         .setErrorCode(Errors.NONE.code())
     )
     createTopicsResponseData.topics().add(
-      new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      new CreatableTopicResult()
         .setName("failed-topic")
         .setErrorCode(Errors.POLICY_VIOLATION.code())
         .setErrorMessage("Policy violation")
     )
 
     val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
-    val clientResponse = new ClientResponse(header, null, null,
-      0, 0, false, null, null, createTopicsResponse)
+    topicCreator.setResponseForWithPrincipal(createTopicsResponse)
 
-    argumentCaptor.getValue.onComplete(clientResponse)
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
 
     // Only the failed topic should be cached
     val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("success-topic",
 "failed-topic", "nonexistent-topic"), mockTime.milliseconds())
@@ -475,13 +468,13 @@ class AutoTopicCreationManagerTest {
   def testErrorCacheTTL(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
+      topicCreator,
+      topicErrorCacheCapacity = testCacheCapacity
+    )
 
     // First cache an error by simulating topic creation failure
     val topics = Map(
@@ -489,28 +482,19 @@ class AutoTopicCreationManagerTest {
     )
     val requestContext = initializeRequestContextWithUserPrincipal()
     val shortTtlMs = 1000L // Use 1 second TTL for faster testing
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
 
     // Simulate a CreateTopicsResponse with error
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    val topicResult = new CreatableTopicResult()
       .setName("test-topic")
       .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
       .setErrorMessage("Invalid replication factor")
     createTopicsResponseData.topics().add(topicResult)
 
     val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
-    val clientResponse = new ClientResponse(header, null, null,
-      0, 0, false, null, null, createTopicsResponse)
+    topicCreator.setResponseForWithPrincipal(createTopicsResponse)
 
-    // Cache the error at T0
-    argumentCaptor.getValue.onComplete(clientResponse)
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
 
     // Verify error is cached and accessible within TTL
     val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
@@ -525,334 +509,16 @@ class AutoTopicCreationManagerTest {
     assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively 
cleaned up")
   }
 
-  @Test
-  def testEnvelopeResponseSuccessfulParsing(): Unit = {
-    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-      config,
-      brokerToController,
-      groupCoordinator,
-      transactionCoordinator,
-      shareCoordinator,
-      mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
-    val topics = Map(
-      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
-    )
-    val requestContext = initializeRequestContextWithUserPrincipal()
-    val timeoutMs = 5000L
-
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Create a successful CreateTopicsResponse
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
-      .setName("test-topic")
-      .setErrorCode(Errors.NONE.code())
-      .setNumPartitions(1)
-      .setReplicationFactor(1.toShort)
-    createTopicsResponseData.topics().add(topicResult)
-
-    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
-    val correlationId = requestContext.correlationId // Use the actual 
correlation ID from request context
-    val clientId = requestContext.clientId
-
-    // Serialize the CreateTopicsResponse with header as it would appear in an 
envelope
-    val responseHeader = new ResponseHeader(correlationId, 
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
-    val serializedResponse = RequestUtils.serialize(responseHeader.data(), 
responseHeader.headerVersion(), 
-                                                     
createTopicsResponse.data(), requestVersion)
-
-    // Create an EnvelopeResponse containing the serialized 
CreateTopicsResponse
-    val envelopeResponse = new EnvelopeResponse(serializedResponse, 
Errors.NONE)
-    val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, 
correlationId)
-    val clientResponse = new ClientResponse(requestHeader, null, null,
-      0, 0, false, null, null, envelopeResponse)
-
-    // Trigger the completion handler
-    argumentCaptor.getValue.onComplete(clientResponse)
-
-    // Verify no errors were cached (successful response)
-    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
-    assertTrue(cachedErrors.isEmpty, "No errors should be cached for 
successful response")
-  }
-
-  @Test
-  def testEnvelopeResponseWithEnvelopeError(): Unit = {
-    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-      config,
-      brokerToController,
-      groupCoordinator,
-      transactionCoordinator,
-      shareCoordinator,
-      mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
-    val topics = Map(
-      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
-    )
-    val requestContext = initializeRequestContextWithUserPrincipal()
-    val timeoutMs = 5000L
-
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Create an EnvelopeResponse with an envelope-level error
-    val envelopeResponse = new EnvelopeResponse(ByteBuffer.allocate(0), 
Errors.UNSUPPORTED_VERSION)
-    val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, 
requestContext.clientId, requestContext.correlationId)
-    val clientResponse = new ClientResponse(requestHeader, null, null,
-      0, 0, false, null, null, envelopeResponse)
-
-    // Trigger the completion handler
-    argumentCaptor.getValue.onComplete(clientResponse)
-
-    // Verify the envelope error was cached
-    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
-    assertEquals(1, cachedErrors.size)
-    assertTrue(cachedErrors("test-topic").contains("Envelope error: 
UNSUPPORTED_VERSION"))
-  }
-
-  @Test
-  def testEnvelopeResponseParsingException(): Unit = {
-    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-      config,
-      brokerToController,
-      groupCoordinator,
-      transactionCoordinator,
-      shareCoordinator,
-      mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
-    val topics = Map(
-      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
-    )
-    val requestContext = initializeRequestContextWithUserPrincipal()
-    val timeoutMs = 5000L
-
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Create an EnvelopeResponse with malformed response data that will cause 
parsing to fail
-    val malformedData = ByteBuffer.wrap("invalid response data".getBytes())
-    val envelopeResponse = new EnvelopeResponse(malformedData, Errors.NONE)
-    val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, 
requestContext.clientId, requestContext.correlationId)
-    val clientResponse = new ClientResponse(requestHeader, null, null,
-      0, 0, false, null, null, envelopeResponse)
-
-    // Trigger the completion handler
-    argumentCaptor.getValue.onComplete(clientResponse)
-
-    // Verify the parsing error was cached
-    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
-    assertEquals(1, cachedErrors.size)
-    assertTrue(cachedErrors("test-topic").contains("Response parsing error:"))
-  }
-
-  @Test
-  def testEnvelopeResponseCorrelationIdMismatch(): Unit = {
-    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-      config,
-      brokerToController,
-      groupCoordinator,
-      transactionCoordinator,
-      shareCoordinator,
-      mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
-    val topics = Map(
-      "test-topic" -> new 
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
-    )
-    val requestContext = initializeRequestContextWithUserPrincipal()
-    val timeoutMs = 5000L
-
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Create a CreateTopicsResponse with a different correlation ID than the 
request
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
-      .setName("test-topic")
-      .setErrorCode(Errors.NONE.code())
-    createTopicsResponseData.topics().add(topicResult)
-
-    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
-    val requestCorrelationId = 123
-    val responseCorrelationId = 456 // Different correlation ID
-    val clientId = "test-client"
-
-    // Serialize the CreateTopicsResponse with mismatched correlation ID
-    val responseHeader = new ResponseHeader(responseCorrelationId, 
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
-    val serializedResponse = RequestUtils.serialize(responseHeader.data(), 
responseHeader.headerVersion(),
-                                                     
createTopicsResponse.data(), requestVersion)
-
-    // Create an EnvelopeResponse containing the serialized 
CreateTopicsResponse
-    val envelopeResponse = new EnvelopeResponse(serializedResponse, 
Errors.NONE)
-    val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, 
requestCorrelationId)
-    val clientResponse = new ClientResponse(requestHeader, null, null,
-      0, 0, false, null, null, envelopeResponse)
-
-    // Trigger the completion handler
-    argumentCaptor.getValue.onComplete(clientResponse)
-
-    // Verify the correlation ID mismatch error was cached
-    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
-    assertEquals(1, cachedErrors.size)
-    assertTrue(cachedErrors("test-topic").contains("Response parsing error:"))
-  }
-
-  @Test
-  def testEnvelopeResponseWithTopicErrors(): Unit = {
-    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-      config,
-      brokerToController,
-      groupCoordinator,
-      transactionCoordinator,
-      shareCoordinator,
-      mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
-    val topics = Map(
-      "test-topic-1" -> new 
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1),
-      "test-topic-2" -> new 
CreatableTopic().setName("test-topic-2").setNumPartitions(1).setReplicationFactor(1)
-    )
-    val requestContext = initializeRequestContextWithUserPrincipal()
-    val timeoutMs = 5000L
-
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Create a CreateTopicsResponse with mixed success and error results
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-
-    // Successful topic
-    val successResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
-      .setName("test-topic-1")
-      .setErrorCode(Errors.NONE.code())
-      .setNumPartitions(1)
-      .setReplicationFactor(1.toShort)
-    createTopicsResponseData.topics().add(successResult)
-
-    // Failed topic
-    val errorResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
-      .setName("test-topic-2")
-      .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
-      .setErrorMessage("Topic already exists")
-    createTopicsResponseData.topics().add(errorResult)
-
-    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
-    val correlationId = requestContext.correlationId  // Use the actual 
correlation ID from request context
-    val clientId = requestContext.clientId
-
-    // Serialize the CreateTopicsResponse with header
-    val responseHeader = new ResponseHeader(correlationId, 
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
-    val serializedResponse = RequestUtils.serialize(responseHeader.data(), 
responseHeader.headerVersion(),
-                                                     
createTopicsResponse.data(), requestVersion)
-
-    // Create an EnvelopeResponse containing the serialized 
CreateTopicsResponse
-    val envelopeResponse = new EnvelopeResponse(serializedResponse, 
Errors.NONE)
-    val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, 
correlationId)
-    val clientResponse = new ClientResponse(requestHeader, null, null,
-      0, 0, false, null, null, envelopeResponse)
-
-    // Trigger the completion handler
-    argumentCaptor.getValue.onComplete(clientResponse)
-
-    // Verify only the failed topic was cached
-    val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(
-      Set("test-topic-1", "test-topic-2"), mockTime.milliseconds())
-    
-    assertEquals(1, cachedErrors.size, s"Expected only 1 error but found: 
$cachedErrors")
-    assertTrue(cachedErrors.contains("test-topic-2"))
-    assertEquals("Topic already exists", cachedErrors("test-topic-2"))
-  }
-
-  @Test
-  def testSendCreateTopicRequestEnvelopeHandling(): Unit = {
-    // Test the sendCreateTopicRequest method (without error caching) handles 
envelopes correctly
-    autoTopicCreationManager = new DefaultAutoTopicCreationManager(
-      config,
-      brokerToController,
-      groupCoordinator,
-      transactionCoordinator,
-      shareCoordinator,
-      mockTime,
-      topicErrorCacheCapacity = testCacheCapacity)
-
-    val requestContext = initializeRequestContextWithUserPrincipal()
-
-    // Call createTopics which uses sendCreateTopicRequest internally
-    autoTopicCreationManager.createTopics(
-      Set("test-topic"), 
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA, 
Some(requestContext))
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
-    // Create a CreateTopicsResponse with an error
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
-      .setName("test-topic")
-      .setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code())
-      .setErrorMessage("Invalid topic name")
-    createTopicsResponseData.topics().add(topicResult)
-
-    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
-    val correlationId = requestContext.correlationId  // Use the actual 
correlation ID from request context
-    val clientId = requestContext.clientId
-
-    // Serialize the CreateTopicsResponse with header
-    val responseHeader = new ResponseHeader(correlationId, 
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
-    val serializedResponse = RequestUtils.serialize(responseHeader.data(), 
responseHeader.headerVersion(),
-                                                     
createTopicsResponse.data(), requestVersion)
-
-    // Create an EnvelopeResponse containing the serialized 
CreateTopicsResponse
-    val envelopeResponse = new EnvelopeResponse(serializedResponse, 
Errors.NONE)
-    val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, 
correlationId)
-    val clientResponse = new ClientResponse(requestHeader, null, null,
-      0, 0, false, null, null, envelopeResponse)
-
-    // Trigger the completion handler
-    argumentCaptor.getValue.onComplete(clientResponse)
-
-    // For sendCreateTopicRequest, errors are not cached, but we can verify 
the handler completed without exception
-    // The test passes if no exception is thrown during envelope processing
-  }
-
   @Test
   def testErrorCacheExpirationBasedEviction(): Unit = {
     // Create manager with small cache size for testing
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
+      topicCreator,
       topicErrorCacheCapacity = 3)
     
     val requestContext = initializeRequestContextWithUserPrincipal()
@@ -866,31 +532,21 @@ class AutoTopicCreationManagerTest {
         topicName -> new 
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)
       )
       
-      autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
-      
-      val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-      Mockito.verify(brokerToController, Mockito.atLeastOnce()).sendRequest(
-        any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-        argumentCaptor.capture())
-      
       // Simulate error response for this topic
-      val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-      val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+      val createTopicsResponseData = new CreateTopicsResponseData()
+      val topicResult = new CreatableTopicResult()
         .setName(topicName)
         .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
         .setErrorMessage(s"Topic '$topicName' already exists.")
       createTopicsResponseData.topics().add(topicResult)
       
       val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-      val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
-      val clientResponse = new ClientResponse(header, null, null,
-        0, 0, false, null, null, createTopicsResponse)
-      
-      argumentCaptor.getValue.onComplete(clientResponse)
+      topicCreator.setResponseForWithPrincipal(createTopicsResponse)
+
+      autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() 
* 2)
       
       // Advance time slightly between additions to ensure different timestamps
       mockTime.sleep(10)
-      
     }
     
     // With cache size of 3, topics 1 and 2 should have been evicted
@@ -909,11 +565,11 @@ class AutoTopicCreationManagerTest {
   def testTopicsInBackoffAreNotRetried(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
+      topicCreator,
       topicErrorCacheCapacity = testCacheCapacity)
 
     val topics = Map(
@@ -922,28 +578,19 @@ class AutoTopicCreationManagerTest {
     val requestContext = initializeRequestContextWithUserPrincipal()
     val timeoutMs = 5000L
 
-    // First attempt - trigger topic creation
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
     // Simulate error response to cache the error
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    val topicResult = new CreatableTopicResult()
       .setName("test-topic")
       .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
       .setErrorMessage("Invalid replication factor")
     createTopicsResponseData.topics().add(topicResult)
 
     val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
-    val clientResponse = new ClientResponse(header, null, null,
-      0, 0, false, null, null, createTopicsResponse)
+    topicCreator.setResponseForWithPrincipal(createTopicsResponse)
 
-    argumentCaptor.getValue.onComplete(clientResponse)
+    // First attempt - trigger topic creation
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
 
     // Verify error is cached
     val cachedErrors = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
@@ -953,20 +600,18 @@ class AutoTopicCreationManagerTest {
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
 
     // Verify still only one request was sent (not retried during back-off)
-    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      any(classOf[ControllerRequestCompletionHandler]))
+    assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal once")
   }
 
   @Test
   def testTopicsOutOfBackoffCanBeRetried(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
+      topicCreator,
       topicErrorCacheCapacity = testCacheCapacity)
 
     val topics = Map(
@@ -975,28 +620,19 @@ class AutoTopicCreationManagerTest {
     val requestContext = initializeRequestContextWithUserPrincipal()
     val shortTtlMs = 1000L
 
-    // First attempt - trigger topic creation
-    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
-
-    val argumentCaptor = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor.capture())
-
     // Simulate error response to cache the error
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+    val createTopicsResponseData = new CreateTopicsResponseData()
+    val topicResult = new CreatableTopicResult()
       .setName("test-topic")
       .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
       .setErrorMessage("Invalid replication factor")
     createTopicsResponseData.topics().add(topicResult)
 
     val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
-    val clientResponse = new ClientResponse(header, null, null,
-      0, 0, false, null, null, createTopicsResponse)
+    topicCreator.setResponseForWithPrincipal(createTopicsResponse)
 
-    argumentCaptor.getValue.onComplete(clientResponse)
+    // First attempt - trigger topic creation
+    autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
 
     // Verify error is cached
     val cachedErrors1 = 
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
 mockTime.milliseconds())
@@ -1013,20 +649,18 @@ class AutoTopicCreationManagerTest {
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, shortTtlMs)
 
     // Verify a second request was sent (retry allowed after back-off expires)
-    Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      any(classOf[ControllerRequestCompletionHandler]))
+    assertEquals(2, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal twice")
   }
 
   @Test
   def testInflightTopicsAreNotRetriedConcurrently(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
+      topicCreator,
       topicErrorCacheCapacity = testCacheCapacity)
 
     val topics = Map(
@@ -1035,31 +669,31 @@ class AutoTopicCreationManagerTest {
     val requestContext = initializeRequestContextWithUserPrincipal()
     val timeoutMs = 5000L
 
+    // Use a future that doesn't complete immediately to simulate in-flight 
state
+    val future = new CompletableFuture[CreateTopicsResponse]()
+    topicCreator.setFutureForWithPrincipal(future)
+
     // First call - should send request and mark topic as in-flight
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
 
-    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      any(classOf[ControllerRequestCompletionHandler]))
+    assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal once")
 
     // Second concurrent call - should NOT send request because topic is 
in-flight
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
 
     // Verify still only one request was sent (concurrent request blocked)
-    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      any(classOf[ControllerRequestCompletionHandler]))
+    assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal once")
   }
 
   @Test
   def testBackoffAndInflightInteraction(): Unit = {
     autoTopicCreationManager = new DefaultAutoTopicCreationManager(
       config,
-      brokerToController,
       groupCoordinator,
       transactionCoordinator,
       shareCoordinator,
       mockTime,
+      topicCreator,
       topicErrorCacheCapacity = testCacheCapacity)
 
     val topics = Map(
@@ -1070,56 +704,45 @@ class AutoTopicCreationManagerTest {
     val requestContext = initializeRequestContextWithUserPrincipal()
     val timeoutMs = 5000L
 
-    // Create error for backoff-topic
-    val backoffOnly = Map("backoff-topic" -> topics("backoff-topic"))
-    autoTopicCreationManager.createStreamsInternalTopics(backoffOnly, 
requestContext, timeoutMs)
-
-    val argumentCaptor1 = 
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
-    Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      argumentCaptor1.capture())
-
     // Simulate error response for backoff-topic
-    val createTopicsResponseData = new 
org.apache.kafka.common.message.CreateTopicsResponseData()
-    val topicResult = new 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+    val backoffResponseData = new CreateTopicsResponseData()
+    val backoffResult = new CreatableTopicResult()
       .setName("backoff-topic")
       .setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
       .setErrorMessage("Invalid replication factor")
-    createTopicsResponseData.topics().add(topicResult)
+    backoffResponseData.topics().add(backoffResult)
+    val backoffResponse = new CreateTopicsResponse(backoffResponseData)
+    topicCreator.setResponseForWithPrincipal(backoffResponse)
 
-    val createTopicsResponse = new 
CreateTopicsResponse(createTopicsResponseData)
-    val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
-    val clientResponse = new ClientResponse(header, null, null,
-      0, 0, false, null, null, createTopicsResponse)
-
-    argumentCaptor1.getValue.onComplete(clientResponse)
+    // Create error for backoff-topic
+    val backoffOnly = Map("backoff-topic" -> topics("backoff-topic"))
+    autoTopicCreationManager.createStreamsInternalTopics(backoffOnly, 
requestContext, timeoutMs)
 
     // Make inflight-topic in-flight (without completing the request)
+    val inflightFuture = new CompletableFuture[CreateTopicsResponse]()
+    topicCreator.setFutureForWithPrincipal(inflightFuture)
+
     val inflightOnly = Map("inflight-topic" -> topics("inflight-topic"))
     autoTopicCreationManager.createStreamsInternalTopics(inflightOnly, 
requestContext, timeoutMs)
 
-    Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
-      any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
-      any(classOf[ControllerRequestCompletionHandler]))
-
     // Now attempt to create all three topics together
+    val normalResponseData = new CreateTopicsResponseData()
+    val normalResult = new CreatableTopicResult()
+      .setName("normal-topic")
+      .setErrorCode(Errors.NONE.code())
+    normalResponseData.topics().add(normalResult)
+    val normalResponse = new CreateTopicsResponse(normalResponseData)
+    topicCreator.setResponseForWithPrincipal(normalResponse)
+
     autoTopicCreationManager.createStreamsInternalTopics(topics, 
requestContext, timeoutMs)
 
-    val argumentCaptor2 = 
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
     // Total 3 requests: 1 for backoff-topic, 1 for inflight-topic, 1 for 
normal-topic only
-    Mockito.verify(brokerToController, Mockito.times(3)).sendRequest(
-      argumentCaptor2.capture(),
-      any(classOf[ControllerRequestCompletionHandler]))
+    assertEquals(3, topicCreator.withPrincipalCallCount, "Should have called 
createTopicWithPrincipal 3 times")
 
     // Verify that only normal-topic was included in the last request
-    val lastRequest = 
argumentCaptor2.getValue.asInstanceOf[EnvelopeRequest.Builder]
-      .build(ApiKeys.ENVELOPE.latestVersion())
-    val forwardedRequestBuffer = lastRequest.requestData().duplicate()
-    val requestHeader = RequestHeader.parse(forwardedRequestBuffer)
-    val parsedRequest = CreateTopicsRequest.parse(new 
org.apache.kafka.common.protocol.ByteBufferAccessor(forwardedRequestBuffer),
-      requestHeader.apiVersion())
-
-    val topicNames = parsedRequest.data().topics().asScala.map(_.name()).toSet
+    val calls = topicCreator.getWithPrincipalCalls
+    val lastRequest = calls(2)._2.build()
+    val topicNames = lastRequest.data().topics().asScala.map(_.name()).toSet
     assertEquals(1, topicNames.size, "Only normal-topic should be created")
     assertTrue(topicNames.contains("normal-topic"), "normal-topic should be in 
the request")
     assertTrue(!topicNames.contains("backoff-topic"), "backoff-topic should be 
filtered (in back-off)")

Reply via email to