This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 427b409cf44 MINOR: Refactor auto topic creation to separate envelope
logic (#21272)
427b409cf44 is described below
commit 427b409cf440f745ad6195673d3342f6bd3974d4
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Jan 16 09:46:12 2026 +0100
MINOR: Refactor auto topic creation to separate envelope logic (#21272)
This commit introduces a TopicCreator interface to abstract topic
creation logic, making the code more modular and testable. The main
changes include:
- Implemented KRaftTopicCreator for KRaft-based topic creation in Java
and removed the corresponding logic from DefaultAutoTopicCreationManager
- Use a mock instance of TopicCreator in AutoTopicCreationManagerTest to
better test without Mockito's ArgumentCaptor complexity.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../main/java/kafka/server/KRaftTopicCreator.java | 167 ++++
core/src/main/java/kafka/server/TopicCreator.java | 56 ++
.../kafka/server/AutoTopicCreationManager.scala | 244 ++----
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +-
.../java/kafka/server/KRaftTopicCreatorTest.java | 572 ++++++++++++++
.../server/AutoTopicCreationManagerTest.scala | 845 ++++++---------------
6 files changed, 1090 insertions(+), 798 deletions(-)
diff --git a/core/src/main/java/kafka/server/KRaftTopicCreator.java
b/core/src/main/java/kafka/server/KRaftTopicCreator.java
new file mode 100644
index 00000000000..6cb0c63cbcb
--- /dev/null
+++ b/core/src/main/java/kafka/server/KRaftTopicCreator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * KRaft implementation of TopicCreator that forwards CreateTopics requests to
the controller.
+ * When creating topics with a principal, requests are wrapped in an envelope
to preserve the
+ * original request context for authorization.
+ */
+public class KRaftTopicCreator implements TopicCreator {
+
+ private final NodeToControllerChannelManager channelManager;
+
+ public KRaftTopicCreator(NodeToControllerChannelManager channelManager) {
+ this.channelManager = channelManager;
+ }
+
+ @Override
+ public CompletableFuture<CreateTopicsResponse> createTopicWithPrincipal(
+ RequestContext requestContext,
+ CreateTopicsRequest.Builder createTopicsRequest
+ ) {
+ CompletableFuture<CreateTopicsResponse> responseFuture = new
CompletableFuture<>();
+
+ short requestVersion = channelManager.controllerApiVersions()
+ .map(v -> v.latestUsableVersion(ApiKeys.CREATE_TOPICS))
+ .orElse(ApiKeys.CREATE_TOPICS.latestVersion());
+
+ RequestHeader requestHeader = new RequestHeader(
+ ApiKeys.CREATE_TOPICS,
+ requestVersion,
+ requestContext.clientId(),
+ requestContext.correlationId()
+ );
+
+ AbstractRequest.Builder<? extends AbstractRequest> envelopeRequest =
ForwardingManager$.MODULE$.buildEnvelopeRequest(
+ requestContext,
+ createTopicsRequest.build(requestHeader.apiVersion())
+ .serializeWithHeader(requestHeader)
+ );
+
+ ControllerRequestCompletionHandler handler = new
ControllerRequestCompletionHandler() {
+ @Override
+ public void onTimeout() {
+ responseFuture.completeExceptionally(
+ new TimeoutException("CreateTopicsRequest to controller
timed out")
+ );
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ if (response.authenticationException() != null) {
+
responseFuture.completeExceptionally(response.authenticationException());
+ } else if (response.versionMismatch() != null) {
+
responseFuture.completeExceptionally(response.versionMismatch());
+ } else if (response.wasDisconnected()) {
+ responseFuture.completeExceptionally(new
IOException("Disconnected before receiving CreateTopicsResponse"));
+ } else if (response.hasResponse()) {
+ if (response.responseBody() instanceof EnvelopeResponse
envelopeResponse) {
+ Errors envelopeError = envelopeResponse.error();
+ if (envelopeError != Errors.NONE) {
+
responseFuture.completeExceptionally(envelopeError.exception());
+ } else {
+ try {
+ CreateTopicsResponse createTopicsResponse =
(CreateTopicsResponse) AbstractResponse.parseResponse(
+ envelopeResponse.responseData(),
+ requestHeader
+ );
+ responseFuture.complete(createTopicsResponse);
+ } catch (Exception e) {
+ responseFuture.completeExceptionally(e);
+ }
+ }
+ } else {
+ responseFuture.completeExceptionally(
+ new IllegalStateException("Expected
EnvelopeResponse but got: " +
+
response.responseBody().getClass().getSimpleName())
+ );
+ }
+ } else {
+ responseFuture.completeExceptionally(
+ new IllegalStateException("Got no response body for
EnvelopeResponse")
+ );
+ }
+ }
+ };
+
+ channelManager.sendRequest(envelopeRequest, handler);
+ return responseFuture;
+ }
+
+ @Override
+ public CompletableFuture<CreateTopicsResponse> createTopicWithoutPrincipal(
+ CreateTopicsRequest.Builder createTopicsRequest
+ ) {
+ CompletableFuture<CreateTopicsResponse> responseFuture = new
CompletableFuture<>();
+
+ ControllerRequestCompletionHandler handler = new
ControllerRequestCompletionHandler() {
+ @Override
+ public void onTimeout() {
+ responseFuture.completeExceptionally(
+ new TimeoutException("CreateTopicsRequest to controller
timed out")
+ );
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ if (response.authenticationException() != null) {
+
responseFuture.completeExceptionally(response.authenticationException());
+ } else if (response.versionMismatch() != null) {
+
responseFuture.completeExceptionally(response.versionMismatch());
+ } else if (response.wasDisconnected()) {
+ responseFuture.completeExceptionally(new
IOException("Disconnected before receiving CreateTopicsResponse"));
+ } else if (response.hasResponse()) {
+ if (response.responseBody() instanceof
CreateTopicsResponse createTopicsResponse) {
+ responseFuture.complete(createTopicsResponse);
+ } else {
+ responseFuture.completeExceptionally(
+ new IllegalStateException("Expected
CreateTopicsResponse but got: " +
+
response.responseBody().getClass().getSimpleName())
+ );
+ }
+ } else {
+ responseFuture.completeExceptionally(
+ new IllegalStateException("Got no response body for
CreateTopicsRequest")
+ );
+ }
+ }
+ };
+
+ channelManager.sendRequest(createTopicsRequest, handler);
+ return responseFuture;
+ }
+
+}
diff --git a/core/src/main/java/kafka/server/TopicCreator.java
b/core/src/main/java/kafka/server/TopicCreator.java
new file mode 100644
index 00000000000..236de0f0900
--- /dev/null
+++ b/core/src/main/java/kafka/server/TopicCreator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.RequestContext;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction for creating topics via the controller.
+ * Allows different implementations to be used interchangeably
+ * by the AutoTopicCreationManager, enabling better separation of concerns and
testability.
+ */
+public interface TopicCreator {
+
+ /**
+ * Send a create topics request with principal for user-initiated topic
creation.
+ * The request context is used to preserve the original client principal
for auditing.
+ *
+ * @param requestContext The request context containing the client
principal.
+ * @param createTopicsRequest The topics to be created.
+ * @return A future of the create topics response. This future will be
completed on the network thread.
+ */
+ CompletableFuture<CreateTopicsResponse> createTopicWithPrincipal(
+ RequestContext requestContext,
+ CreateTopicsRequest.Builder createTopicsRequest
+ );
+
+ /**
+ * Send a create topics request without principal for internal topic
creation (e.g., consumer offsets, transaction state).
+ * No request context is required since these are system-initiated
requests.
+ *
+ * @param createTopicsRequest The topics to be created.
+ * @return A future of the create topics response. This future will be
completed on the network thread.
+ */
+ CompletableFuture<CreateTopicsResponse> createTopicWithoutPrincipal(
+ CreateTopicsRequest.Builder createTopicsRequest
+ );
+}
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 5ea254fbe7a..f66aa7d3c1a 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -22,25 +22,22 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Properties}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
-import org.apache.kafka.clients.ClientResponse
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{AuthenticationException,
InvalidTopicException, TimeoutException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.message.CreateTopicsRequestData
import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicConfig, CreatableTopicConfigCollection}
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractResponse,
CreateTopicsRequest, CreateTopicsResponse, EnvelopeResponse, RequestContext,
RequestHeader}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{CreateTopicsRequest,
CreateTopicsResponse, RequestContext}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.quota.ControllerMutationQuota
import org.apache.kafka.common.utils.Time
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
-import scala.jdk.OptionConverters.RichOptional
trait AutoTopicCreationManager {
@@ -134,11 +131,11 @@ private[server] class ExpiringErrorCache(maxSize: Int,
time: Time) {
class DefaultAutoTopicCreationManager(
config: KafkaConfig,
- channelManager: NodeToControllerChannelManager,
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
shareCoordinator: ShareCoordinator,
time: Time,
+ topicCreator: TopicCreator,
topicErrorCacheCapacity: Int = 1000
) extends AutoTopicCreationManager with Logging {
@@ -193,7 +190,7 @@ class DefaultAutoTopicCreationManager(
}
if (topicsToCreate.nonEmpty) {
- sendCreateTopicRequestWithErrorCaching(topicsToCreate,
Some(requestContext), timeoutMs)
+ sendCreateTopicRequestWithErrorCaching(topicsToCreate, requestContext,
timeoutMs)
}
}
@@ -208,99 +205,31 @@ class DefaultAutoTopicCreationManager(
creatableTopics: Map[String, CreatableTopic],
requestContext: Option[RequestContext]
): Seq[MetadataResponseTopic] = {
- val topicsToCreate = new
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
- topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
-
- val createTopicsRequest = new CreateTopicsRequest.Builder(
- new CreateTopicsRequestData()
- .setTimeoutMs(config.requestTimeoutMs)
- .setTopics(topicsToCreate)
- )
+ val createTopicsRequest: CreateTopicsRequest.Builder =
makeCreateTopicsRequestBuilder(creatableTopics)
- // Capture request header information for proper envelope response parsing
- val requestHeaderForParsing = requestContext.map { context =>
- val requestVersion =
- channelManager.controllerApiVersions.toScala match {
- case None =>
- ApiKeys.CREATE_TOPICS.latestVersion()
- case Some(nodeApiVersions) =>
- nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
- }
-
- new RequestHeader(ApiKeys.CREATE_TOPICS,
- requestVersion,
- context.clientId,
- context.correlationId)
+ val responseFuture = requestContext match {
+ case Some(context) => topicCreator.createTopicWithPrincipal(context,
createTopicsRequest)
+ case None =>
topicCreator.createTopicWithoutPrincipal(createTopicsRequest)
}
- val requestCompletionHandler = new ControllerRequestCompletionHandler {
- override def onTimeout(): Unit = {
+ responseFuture.whenComplete {
+ (response, throwable) =>
clearInflightRequests(creatableTopics)
- debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
- }
-
- override def onComplete(response: ClientResponse): Unit = {
- clearInflightRequests(creatableTopics)
- if (response.authenticationException() != null) {
- warn(s"Auto topic creation failed for ${creatableTopics.keys} with
authentication exception")
- } else if (response.versionMismatch() != null) {
- warn(s"Auto topic creation failed for ${creatableTopics.keys} with
invalid version exception")
- } else {
- if (response.hasResponse) {
- response.responseBody() match {
- case envelopeResponse: EnvelopeResponse =>
- // Unwrap the envelope response to get the actual
CreateTopicsResponse
- val envelopeError = envelopeResponse.error()
- if (envelopeError != Errors.NONE) {
- warn(s"Auto topic creation failed for
${creatableTopics.keys} with envelope error: ${envelopeError}")
- } else {
- requestHeaderForParsing match {
- case Some(requestHeader) =>
- try {
- // Use the captured request header for proper envelope
response parsing
- val createTopicsResponse =
AbstractResponse.parseResponse(
- envelopeResponse.responseData(),
requestHeader).asInstanceOf[CreateTopicsResponse]
-
-
createTopicsResponse.data().topics().forEach(topicResult => {
- val error = Errors.forCode(topicResult.errorCode)
- if (error != Errors.NONE) {
- warn(s"Auto topic creation failed for
${topicResult.name} with error '${error.name}': ${topicResult.errorMessage}")
- }
- })
- } catch {
- case e: Exception =>
- warn(s"Failed to parse envelope response for auto
topic creation of ${creatableTopics.keys}", e)
- }
- case None =>
- warn(s"Cannot parse envelope response without original
request header information")
- }
- }
- case createTopicsResponse: CreateTopicsResponse =>
- createTopicsResponse.data().topics().forEach(topicResult => {
- val error = Errors.forCode(topicResult.errorCode)
- if (error != Errors.NONE) {
- warn(s"Auto topic creation failed for ${topicResult.name}
with error '${error.name}': ${topicResult.errorMessage}")
- }
- })
- case other =>
- warn(s"Auto topic creation request received unexpected
response type: ${other.getClass.getSimpleName}")
+ // Log any errors from the topic creation attempt
+ if (throwable != null) {
+ logError(creatableTopics, throwable)
+ } else if (response != null) {
+ response.data().topics().forEach(topicResult => {
+ val error = Errors.forCode(topicResult.errorCode)
+ if (error != Errors.NONE) {
+ warn(s"Auto topic creation failed for ${topicResult.name} with
error '${error.name}': ${topicResult.errorMessage}")
}
- }
- debug(s"Auto topic creation completed for ${creatableTopics.keys}
with response ${response.responseBody}.")
+ })
+ } else {
+ warn("CreateTopicsResponse future completed with null response and
no exception")
}
- }
- }
-
- val request = (requestContext, requestHeaderForParsing) match {
- case (Some(context), Some(requestHeader)) =>
- ForwardingManager.buildEnvelopeRequest(context,
-
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
- case _ =>
- createTopicsRequest
}
- channelManager.sendRequest(request, requestCompletionHandler)
-
val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
new MetadataResponseTopic()
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
@@ -401,109 +330,54 @@ class DefaultAutoTopicCreationManager(
private def sendCreateTopicRequestWithErrorCaching(
creatableTopics: Map[String, CreatableTopic],
- requestContext: Option[RequestContext],
+ requestContext: RequestContext,
timeoutMs: Long
- ): Seq[MetadataResponseTopic] = {
- val topicsToCreate = new
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
- topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
-
- val createTopicsRequest = new CreateTopicsRequest.Builder(
- new CreateTopicsRequestData()
- .setTimeoutMs(config.requestTimeoutMs)
- .setTopics(topicsToCreate)
- )
-
- // Capture request header information for proper envelope response parsing
- val requestHeaderForParsing = requestContext.map { context =>
- val requestVersion =
- channelManager.controllerApiVersions.toScala match {
- case None =>
- ApiKeys.CREATE_TOPICS.latestVersion()
- case Some(nodeApiVersions) =>
- nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
- }
-
- new RequestHeader(ApiKeys.CREATE_TOPICS,
- requestVersion,
- context.clientId,
- context.correlationId)
- }
+ ): Unit = {
+ val createTopicsRequest: CreateTopicsRequest.Builder =
makeCreateTopicsRequestBuilder(creatableTopics)
- val requestCompletionHandler = new ControllerRequestCompletionHandler {
- override def onTimeout(): Unit = {
- clearInflightRequests(creatableTopics)
- debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
- cacheTopicCreationErrors(creatableTopics.keys.toSet, "Auto topic
creation timed out.", timeoutMs)
- }
+ val createTopicsResponseFuture =
topicCreator.createTopicWithPrincipal(requestContext, createTopicsRequest)
- override def onComplete(response: ClientResponse): Unit = {
+ createTopicsResponseFuture.whenComplete {
+ (response, throwable) =>
clearInflightRequests(creatableTopics)
- if (response.authenticationException() != null) {
- val authException = response.authenticationException()
- warn(s"Auto topic creation failed for ${creatableTopics.keys} with
authentication exception: ${authException.getMessage}")
- cacheTopicCreationErrors(creatableTopics.keys.toSet,
authException.getMessage, timeoutMs)
- } else if (response.versionMismatch() != null) {
- val versionException = response.versionMismatch()
- warn(s"Auto topic creation failed for ${creatableTopics.keys} with
version mismatch exception: ${versionException.getMessage}")
- cacheTopicCreationErrors(creatableTopics.keys.toSet,
versionException.getMessage, timeoutMs)
+ // Log any errors from the topic creation attempt
+ if (throwable != null) {
+ logError(creatableTopics, throwable)
+ val errorMessage =
Option(throwable.getMessage).getOrElse(throwable.toString)
+ cacheTopicCreationErrors(creatableTopics.keys.toSet, errorMessage,
timeoutMs)
+ } else if (response != null) {
+ debug(s"Auto topic creation completed for ${creatableTopics.keys}
with response $response.")
+ cacheTopicCreationErrorsFromResponse(response, timeoutMs)
} else {
- if (response.hasResponse) {
- response.responseBody() match {
- case envelopeResponse: EnvelopeResponse =>
- // Unwrap the envelope response to get the actual
CreateTopicsResponse
- val envelopeError = envelopeResponse.error()
- if (envelopeError != Errors.NONE) {
- warn(s"Auto topic creation failed for
${creatableTopics.keys} with envelope error: ${envelopeError}")
- cacheTopicCreationErrors(creatableTopics.keys.toSet,
s"Envelope error: ${envelopeError}", timeoutMs)
- } else {
- requestHeaderForParsing match {
- case Some(requestHeader) =>
- try {
- // Use the captured request header for proper envelope
response parsing
- val createTopicsResponse =
AbstractResponse.parseResponse(
- envelopeResponse.responseData(),
requestHeader).asInstanceOf[CreateTopicsResponse]
-
-
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
- } catch {
- case e: Exception =>
- warn(s"Failed to parse envelope response for auto
topic creation of ${creatableTopics.keys}", e)
- cacheTopicCreationErrors(creatableTopics.keys.toSet,
s"Response parsing error: ${e.getMessage}", timeoutMs)
- }
- case None =>
- warn(s"Cannot parse envelope response without original
request header information")
- cacheTopicCreationErrors(creatableTopics.keys.toSet,
"Missing request header for envelope parsing", timeoutMs)
- }
- }
- case createTopicsResponse: CreateTopicsResponse =>
- cacheTopicCreationErrorsFromResponse(createTopicsResponse,
timeoutMs)
- case unexpectedResponse =>
- warn(s"Auto topic creation request received unexpected
response type: ${unexpectedResponse.getClass.getSimpleName}")
- cacheTopicCreationErrors(creatableTopics.keys.toSet,
s"Unexpected response type: ${unexpectedResponse.getClass.getSimpleName}",
timeoutMs)
- }
- debug(s"Auto topic creation completed for ${creatableTopics.keys}
with response ${response.responseBody}.")
- }
+ val ex = new IllegalStateException("CreateTopicsResponse future
completed with null response and no exception")
+ error(s"Auto topic creation failed for ${creatableTopics.keys} due
to unexpected future completion state", ex)
+ cacheTopicCreationErrors(creatableTopics.keys.toSet, ex.getMessage,
timeoutMs)
}
- }
}
+ }
- val request = (requestContext, requestHeaderForParsing) match {
- case (Some(context), Some(requestHeader)) =>
- ForwardingManager.buildEnvelopeRequest(context,
-
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
- case _ =>
- createTopicsRequest
+ private def logError(creatableTopics: Map[String, CreatableTopic],
throwable: Throwable): Unit = {
+ throwable match {
+ case _: TimeoutException =>
+ debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
+ case _: AuthenticationException =>
+ warn(s"Auto topic creation failed for ${creatableTopics.keys} with
authentication exception")
+ case _: UnsupportedVersionException =>
+ warn(s"Auto topic creation failed for ${creatableTopics.keys} with
invalid version exception")
+ case other =>
+ warn(s"Auto topic creation failed for ${creatableTopics.keys} with
exception", other)
}
+ }
- channelManager.sendRequest(request, requestCompletionHandler)
-
- val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
- new MetadataResponseTopic()
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- .setName(topic)
- .setIsInternal(Topic.isInternal(topic))
- }
+ private def makeCreateTopicsRequestBuilder(creatableTopics: Map[String,
CreatableTopic]): CreateTopicsRequest.Builder = {
+ val topicsToCreate = new
CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
+ topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
- creatableTopicResponses
+ new CreateTopicsRequest.Builder(
+ new CreateTopicsRequestData()
+ .setTimeoutMs(config.requestTimeoutMs)
+ .setTopics(topicsToCreate)
+ )
}
private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage:
String, ttlMs: Long): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 67ea88b37fb..c173a9aad04 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -392,9 +392,9 @@ class BrokerServer(
new KafkaScheduler(1, true, "transaction-log-manager-"),
producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
+ val topicCreator = new
KRaftTopicCreator(clientToControllerChannelManager)
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config, clientToControllerChannelManager, groupCoordinator,
- transactionCoordinator, shareCoordinator, time)
+ config, groupCoordinator, transactionCoordinator, shareCoordinator,
time, topicCreator)
dynamicConfigHandlers = Map[ConfigType, ConfigHandler](
ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config,
quotaManagers),
diff --git a/core/src/test/java/kafka/server/KRaftTopicCreatorTest.java
b/core/src/test/java/kafka/server/KRaftTopicCreatorTest.java
new file mode 100644
index 00000000000..0a749bde44e
--- /dev/null
+++ b/core/src/test/java/kafka/server/KRaftTopicCreatorTest.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestUtils;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class KRaftTopicCreatorTest {
+
+ private static final int REQUEST_TIMEOUT = 100;
+
+ private NodeToControllerChannelManager brokerToController;
+ private KRaftTopicCreator kraftTopicCreator;
+
+ @BeforeEach
+ public void setup() {
+ brokerToController = mock(NodeToControllerChannelManager.class);
+
+ ApiVersionsResponseData.ApiVersion createTopicApiVersion = new
ApiVersionsResponseData.ApiVersion()
+ .setApiKey(ApiKeys.CREATE_TOPICS.id)
+ .setMinVersion(ApiKeys.CREATE_TOPICS.oldestVersion())
+ .setMaxVersion(ApiKeys.CREATE_TOPICS.latestVersion());
+
+ when(brokerToController.controllerApiVersions())
+
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))));
+
+ kraftTopicCreator = new KRaftTopicCreator(brokerToController);
+ }
+
+ @Test
+ public void testCreateTopicWithMetadataContextPassPrincipal() throws
Exception {
+ String topicName = "topic";
+ KafkaPrincipal userPrincipal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user");
+ AtomicBoolean serializeIsCalled = new AtomicBoolean(false);
+
+ KafkaPrincipalSerde principalSerde = new KafkaPrincipalSerde() {
+ @Override
+ public byte[] serialize(KafkaPrincipal principal) {
+ assertEquals(principal, userPrincipal);
+ serializeIsCalled.set(true);
+ return Utils.utf8(principal.toString());
+ }
+
+ @Override
+ public KafkaPrincipal deserialize(byte[] bytes) {
+ return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+ }
+ };
+
+ RequestContext requestContext =
initializeRequestContext(userPrincipal, Optional.of(principalSerde));
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ assertTrue(serializeIsCalled.get());
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<AbstractRequest.Builder<? extends AbstractRequest>>
argumentCaptor =
+ (ArgumentCaptor<AbstractRequest.Builder<? extends
AbstractRequest>>) (ArgumentCaptor<?>)
ArgumentCaptor.forClass(AbstractRequest.Builder.class);
+ verify(brokerToController).sendRequest(
+ argumentCaptor.capture(),
+ any(ControllerRequestCompletionHandler.class));
+
+ EnvelopeRequest capturedRequest = (EnvelopeRequest)
argumentCaptor.getValue()
+ .build(ApiKeys.ENVELOPE.latestVersion());
+ assertEquals(userPrincipal,
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal())));
+ }
+
+ @Test
+ public void
testCreateTopicWithMetadataContextWhenPrincipalSerdeNotDefined() {
+ String topicName = "topic";
+ RequestContext requestContext =
initializeRequestContext(KafkaPrincipal.ANONYMOUS, Optional.empty());
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest));
+ }
+
+ @Test
+ public void testCreateTopicWithoutRequestContext() {
+ String topicName = "topic";
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ kraftTopicCreator.createTopicWithoutPrincipal(createTopicsRequest);
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<AbstractRequest.Builder<? extends AbstractRequest>>
argumentCaptor =
+ (ArgumentCaptor<AbstractRequest.Builder<? extends
AbstractRequest>>) (ArgumentCaptor<?>)
ArgumentCaptor.forClass(AbstractRequest.Builder.class);
+ verify(brokerToController).sendRequest(
+ argumentCaptor.capture(),
+ any(ControllerRequestCompletionHandler.class));
+
+ AbstractRequest.Builder<?> capturedRequest = argumentCaptor.getValue();
+ assertInstanceOf(CreateTopicsRequest.Builder.class, capturedRequest,
+ "Should send CreateTopicsRequest.Builder when no request context
provided");
+ }
+
+ @Test
+ public void testEnvelopeResponseSuccessfulParsing() throws Exception {
+ String topicName = "test-topic";
+ RequestContext requestContext =
initializeRequestContextWithUserPrincipal();
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ CreateTopicsResponseData createTopicsResponseData = new
CreateTopicsResponseData();
+ CreateTopicsResponseData.CreatableTopicResult topicResult =
+ new CreateTopicsResponseData.CreatableTopicResult()
+ .setName(topicName)
+ .setErrorCode(Errors.NONE.code())
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 1);
+ createTopicsResponseData.topics().add(topicResult);
+
+ CreateTopicsResponse createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData);
+ short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion();
+ int correlationId = requestContext.correlationId();
+ String clientId = requestContext.clientId();
+
+ ResponseHeader responseHeader = new ResponseHeader(
+ correlationId,
+ ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion)
+ );
+ ByteBuffer serializedResponse = RequestUtils.serialize(
+ responseHeader.data(),
+ responseHeader.headerVersion(),
+ createTopicsResponse.data(),
+ requestVersion
+ );
+
+ EnvelopeResponse envelopeResponse = new
EnvelopeResponse(serializedResponse, Errors.NONE);
+ RequestHeader requestHeader = new RequestHeader(ApiKeys.ENVELOPE,
(short) 0, clientId, correlationId);
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, null, null,
envelopeResponse
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+
+ CreateTopicsResponse result = responseFuture.get();
+ assertEquals(1, result.data().topics().size());
+ assertEquals(topicName,
result.data().topics().iterator().next().name());
+ assertEquals(Errors.NONE.code(),
result.data().topics().iterator().next().errorCode());
+ }
+
+ @Test
+ public void testEnvelopeResponseWithEnvelopeError() {
+ String topicName = "test-topic";
+ RequestContext requestContext =
initializeRequestContextWithUserPrincipal();
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ EnvelopeResponse envelopeResponse = new
EnvelopeResponse(ByteBuffer.allocate(0), Errors.UNSUPPORTED_VERSION);
+ RequestHeader requestHeader = new RequestHeader(
+ ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(),
requestContext.correlationId()
+ );
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, null, null,
envelopeResponse
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+
+ assertThrows(ExecutionException.class, responseFuture::get);
+ assertTrue(responseFuture.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testEnvelopeResponseParsingException() {
+ String topicName = "test-topic";
+ RequestContext requestContext =
initializeRequestContextWithUserPrincipal();
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ ByteBuffer malformedData = ByteBuffer.wrap("invalid response
data".getBytes());
+ EnvelopeResponse envelopeResponse = new
EnvelopeResponse(malformedData, Errors.NONE);
+ RequestHeader requestHeader = new RequestHeader(
+ ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(),
requestContext.correlationId()
+ );
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, null, null,
envelopeResponse
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+ assertTrue(responseFuture.isCompletedExceptionally());
+ ExecutionException exception = assertThrows(ExecutionException.class,
responseFuture::get);
+ assertInstanceOf(RuntimeException.class, exception.getCause());
+ }
+
+ @Test
+ public void testEnvelopeResponseWithTopicErrors() throws Exception {
+ String topicName1 = "test-topic-1";
+ String topicName2 = "test-topic-2";
+ RequestContext requestContext =
initializeRequestContextWithUserPrincipal();
+
+ CreateTopicsRequestData.CreatableTopicCollection topicsCollection =
+ new CreateTopicsRequestData.CreatableTopicCollection();
+ topicsCollection.add(
+ new CreateTopicsRequestData.CreatableTopic()
+ .setName(topicName1)
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 1)
+ );
+ topicsCollection.add(
+ new CreateTopicsRequestData.CreatableTopic()
+ .setName(topicName2)
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 1)
+ );
+ CreateTopicsRequest.Builder createTopicsRequest = new
CreateTopicsRequest.Builder(
+ new CreateTopicsRequestData()
+ .setTopics(topicsCollection)
+ .setTimeoutMs(REQUEST_TIMEOUT)
+ );
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ CreateTopicsResponseData createTopicsResponseData = new
CreateTopicsResponseData();
+
+ CreateTopicsResponseData.CreatableTopicResult successResult =
+ new CreateTopicsResponseData.CreatableTopicResult()
+ .setName(topicName1)
+ .setErrorCode(Errors.NONE.code())
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 1);
+ createTopicsResponseData.topics().add(successResult);
+
+ CreateTopicsResponseData.CreatableTopicResult errorResult =
+ new CreateTopicsResponseData.CreatableTopicResult()
+ .setName(topicName2)
+ .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
+ .setErrorMessage("Topic already exists");
+ createTopicsResponseData.topics().add(errorResult);
+
+ CreateTopicsResponse createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData);
+ short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion();
+ int correlationId = requestContext.correlationId();
+ String clientId = requestContext.clientId();
+
+ ResponseHeader responseHeader = new ResponseHeader(
+ correlationId,
+ ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion)
+ );
+ ByteBuffer serializedResponse = RequestUtils.serialize(
+ responseHeader.data(),
+ responseHeader.headerVersion(),
+ createTopicsResponse.data(),
+ requestVersion
+ );
+
+ EnvelopeResponse envelopeResponse = new
EnvelopeResponse(serializedResponse, Errors.NONE);
+ RequestHeader requestHeader = new RequestHeader(ApiKeys.ENVELOPE,
(short) 0, clientId, correlationId);
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, null, null,
envelopeResponse
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+
+ CreateTopicsResponse result = responseFuture.get();
+ assertEquals(2, result.data().topics().size());
+ Map<String, CreateTopicsResponseData.CreatableTopicResult> results =
result.data().topics().stream()
+ .collect(Collectors.toMap(
+ CreateTopicsResponseData.CreatableTopicResult::name,
+ t -> t
+ ));
+ assertEquals(Errors.NONE.code(), results.get(topicName1).errorCode());
+ assertEquals(Errors.TOPIC_ALREADY_EXISTS.code(),
results.get(topicName2).errorCode());
+ assertEquals("Topic already exists",
results.get(topicName2).errorMessage());
+ }
+
+ @Test
+ public void testTimeoutException() {
+ String topicName = "test-topic";
+ RequestContext requestContext =
initializeRequestContextWithUserPrincipal();
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ argumentCaptor.getValue().onTimeout();
+
+ ExecutionException exception = assertThrows(ExecutionException.class,
responseFuture::get);
+ assertInstanceOf(TimeoutException.class, exception.getCause());
+ assertTrue(responseFuture.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testAuthenticationException() {
+ String topicName = "test-topic";
+ RequestContext requestContext =
initializeRequestContextWithUserPrincipal();
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ RequestHeader requestHeader = new RequestHeader(
+ ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(),
requestContext.correlationId()
+ );
+ AuthenticationException authException = new
AuthenticationException("Authentication failed");
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, null, authException, null
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+
+ ExecutionException exception = assertThrows(ExecutionException.class,
responseFuture::get);
+ assertInstanceOf(AuthenticationException.class, exception.getCause());
+ assertTrue(responseFuture.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testVersionMismatchException() {
+ String topicName = "test-topic";
+ RequestContext requestContext =
initializeRequestContextWithUserPrincipal();
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithPrincipal(requestContext,
createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ RequestHeader requestHeader = new RequestHeader(
+ ApiKeys.ENVELOPE, (short) 0, requestContext.clientId(),
requestContext.correlationId()
+ );
+ UnsupportedVersionException versionMismatch = new
UnsupportedVersionException("Version mismatch");
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, versionMismatch, null, null
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+
+ ExecutionException exception = assertThrows(ExecutionException.class,
responseFuture::get);
+ assertInstanceOf(UnsupportedVersionException.class,
exception.getCause());
+ assertTrue(responseFuture.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testDirectCreateTopicsResponse() throws Exception {
+ String topicName = "test-topic";
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithoutPrincipal(createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ CreateTopicsResponseData createTopicsResponseData = new
CreateTopicsResponseData();
+ CreateTopicsResponseData.CreatableTopicResult topicResult =
+ new CreateTopicsResponseData.CreatableTopicResult()
+ .setName(topicName)
+ .setErrorCode(Errors.NONE.code())
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 1);
+ createTopicsResponseData.topics().add(topicResult);
+
+ CreateTopicsResponse createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData);
+ RequestHeader requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
(short) 0, "client", 1);
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, null, null,
createTopicsResponse
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+
+ CreateTopicsResponse result = responseFuture.get();
+ assertEquals(1, result.data().topics().size());
+ assertEquals(topicName,
result.data().topics().iterator().next().name());
+ }
+
+ @Test
+ public void testUnexpectedResponseType() {
+ String topicName = "test-topic";
+ CreateTopicsRequest.Builder createTopicsRequest =
createCreateTopicsRequestBuilder(topicName);
+
+ CompletableFuture<CreateTopicsResponse> responseFuture =
+ kraftTopicCreator.createTopicWithoutPrincipal(createTopicsRequest);
+
+ ArgumentCaptor<ControllerRequestCompletionHandler> argumentCaptor =
+ ArgumentCaptor.forClass(ControllerRequestCompletionHandler.class);
+ verify(brokerToController).sendRequest(
+ any(),
+ argumentCaptor.capture());
+
+ MetadataResponse unexpectedResponse = new MetadataResponse(
+ new MetadataResponseData(),
+ ApiKeys.METADATA.latestVersion()
+ );
+ RequestHeader requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
(short) 0, "client", 1);
+ ClientResponse clientResponse = new ClientResponse(
+ requestHeader, null, null, 0, 0, false, null, null,
unexpectedResponse
+ );
+
+ argumentCaptor.getValue().onComplete(clientResponse);
+
+ ExecutionException exception = assertThrows(ExecutionException.class,
responseFuture::get);
+ assertInstanceOf(IllegalStateException.class, exception.getCause());
+ assertTrue(responseFuture.isCompletedExceptionally());
+ }
+
+ private RequestContext initializeRequestContextWithUserPrincipal() {
+ KafkaPrincipal userPrincipal = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user");
+ KafkaPrincipalSerde principalSerde = new KafkaPrincipalSerde() {
+ @Override
+ public byte[] serialize(KafkaPrincipal principal) {
+ return Utils.utf8(principal.toString());
+ }
+
+ @Override
+ public KafkaPrincipal deserialize(byte[] bytes) {
+ return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
+ }
+ };
+ return initializeRequestContext(userPrincipal,
Optional.of(principalSerde));
+ }
+
+ private RequestContext initializeRequestContext(
+ KafkaPrincipal kafkaPrincipal,
+ Optional<KafkaPrincipalSerde> principalSerde
+ ) {
+ try {
+ RequestHeader requestHeader = new RequestHeader(
+ ApiKeys.METADATA,
+ ApiKeys.METADATA.latestVersion(),
+ "clientId",
+ 0
+ );
+ return new RequestContext(
+ requestHeader,
+ "1",
+ InetAddress.getLocalHost(),
+ Optional.empty(),
+ kafkaPrincipal,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT,
+ ClientInformation.EMPTY,
+ false,
+ principalSerde
+ );
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private CreateTopicsRequest.Builder
createCreateTopicsRequestBuilder(String topicName) {
+ CreateTopicsRequestData.CreatableTopicCollection topicsCollection =
+ new CreateTopicsRequestData.CreatableTopicCollection();
+ topicsCollection.add(
+ new CreateTopicsRequestData.CreatableTopic()
+ .setName(topicName)
+ .setNumPartitions(1)
+ .setReplicationFactor((short) 1)
+ );
+ return new CreateTopicsRequest.Builder(
+ new CreateTopicsRequestData()
+ .setTopics(topicsCollection)
+ .setTimeoutMs(REQUEST_TIMEOUT)
+ );
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index eee53de824f..1266176f63a 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -18,22 +18,20 @@
package kafka.server
import java.net.InetAddress
-import java.nio.ByteBuffer
import java.util
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.{Collections, Optional, Properties}
+import java.util.concurrent.CompletableFuture
+import java.util.{Optional, Properties}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
-import org.apache.kafka.clients.{ClientResponse, NodeApiVersions,
RequestCompletionHandler}
import org.apache.kafka.common.Node
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
-import org.apache.kafka.common.message.{ApiVersionsResponseData,
CreateTopicsRequestData}
import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicConfig, CreatableTopicConfigCollection}
+import org.apache.kafka.common.message.CreateTopicsResponseData
+import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor, Errors}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.requests.RequestUtils
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.server.util.MockTime
@@ -42,16 +40,68 @@ import
org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorCon
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.quota.ControllerMutationQuota
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
-import org.mockito.ArgumentMatchers.any
-import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
-import org.mockito.Mockito.never
+import org.mockito.{ArgumentMatchers, Mockito}
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Test implementation of TopicCreator that tracks method calls and allows
configuring responses.
+ */
+class TestTopicCreator extends TopicCreator {
+ private val withPrincipalCalls = ListBuffer[(RequestContext,
CreateTopicsRequest.Builder)]()
+ private val withoutPrincipalCalls = ListBuffer[CreateTopicsRequest.Builder]()
+ private var withPrincipalResponse: CompletableFuture[CreateTopicsResponse] =
_
+ private var withoutPrincipalResponse:
CompletableFuture[CreateTopicsResponse] = _
+
+ override def createTopicWithPrincipal(
+ requestContext: RequestContext,
+ request: CreateTopicsRequest.Builder
+ ): CompletableFuture[CreateTopicsResponse] = {
+ withPrincipalCalls += ((requestContext, request))
+ if (withPrincipalResponse != null) withPrincipalResponse else
CompletableFuture.completedFuture(null)
+ }
+
+ override def createTopicWithoutPrincipal(
+ request: CreateTopicsRequest.Builder
+ ): CompletableFuture[CreateTopicsResponse] = {
+ withoutPrincipalCalls += request
+ if (withoutPrincipalResponse != null) withoutPrincipalResponse else
CompletableFuture.completedFuture(null)
+ }
+
+ def setResponseForWithPrincipal(response: CreateTopicsResponse): Unit = {
+ withPrincipalResponse = CompletableFuture.completedFuture(response)
+ }
+
+ def setResponseForWithoutPrincipal(response: CreateTopicsResponse): Unit = {
+ withoutPrincipalResponse = CompletableFuture.completedFuture(response)
+ }
+
+ def setFutureForWithPrincipal(future:
CompletableFuture[CreateTopicsResponse]): Unit = {
+ withPrincipalResponse = future
+ }
+
+ def setFutureForWithoutPrincipal(future:
CompletableFuture[CreateTopicsResponse]): Unit = {
+ withoutPrincipalResponse = future
+ }
+
+ def getWithPrincipalCalls: List[(RequestContext,
CreateTopicsRequest.Builder)] = withPrincipalCalls.toList
+ def getWithoutPrincipalCalls: List[CreateTopicsRequest.Builder] =
withoutPrincipalCalls.toList
+
+ def withPrincipalCallCount: Int = withPrincipalCalls.size
+ def withoutPrincipalCallCount: Int = withoutPrincipalCalls.size
+
+ def reset(): Unit = {
+ withPrincipalCalls.clear()
+ withoutPrincipalCalls.clear()
+ withPrincipalResponse = null
+ withoutPrincipalResponse = null
+ }
+}
class AutoTopicCreationManagerTest {
@@ -59,7 +109,7 @@ class AutoTopicCreationManagerTest {
private val testCacheCapacity = 3
private var config: KafkaConfig = _
private val metadataCache = Mockito.mock(classOf[MetadataCache])
- private val brokerToController =
Mockito.mock(classOf[NodeToControllerChannelManager])
+ private val topicCreator = new TestTopicCreator()
private val groupCoordinator = Mockito.mock(classOf[GroupCoordinator])
private val transactionCoordinator =
Mockito.mock(classOf[TransactionCoordinator])
private val shareCoordinator = Mockito.mock(classOf[ShareCoordinator])
@@ -81,13 +131,12 @@ class AutoTopicCreationManagerTest {
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG,
internalTopicReplicationFactor.toString)
- // Set a short group max session timeout for testing TTL (1 second)
-
props.setProperty(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
"1000")
config = KafkaConfig.fromProps(props)
val aliveBrokers = util.List.of(new Node(0, "host0", 0), new Node(1,
"host1", 1))
-
Mockito.when(metadataCache.getAliveBrokerNodes(any(classOf[ListenerName]))).thenReturn(aliveBrokers)
+
Mockito.when(metadataCache.getAliveBrokerNodes(ArgumentMatchers.any(classOf[ListenerName]))).thenReturn(aliveBrokers)
+ topicCreator.reset()
}
@Test
@@ -119,109 +168,86 @@ class AutoTopicCreationManagerTest {
replicationFactor: Short = 1): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
+ topicCreator,
topicErrorCacheCapacity = testCacheCapacity)
- val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
- topicsCollection.add(getNewTopic(topicName, numPartitions,
replicationFactor))
- val requestBody = new CreateTopicsRequest.Builder(
- new CreateTopicsRequestData()
- .setTopics(topicsCollection)
- .setTimeoutMs(requestTimeout))
+ // Set up the topicCreator to return a successful response
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ val topicResult = new CreatableTopicResult()
+ .setName(topicName)
+ .setErrorCode(Errors.NONE.code())
+ createTopicsResponseData.topics().add(topicResult)
+ val response = new CreateTopicsResponse(createTopicsResponseData)
+ topicCreator.setResponseForWithoutPrincipal(response)
- // Calling twice with the same topic will only trigger one forwarding.
- createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName,
isInternal)
+ // First call to create topic - should trigger the topic creator
createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName,
isInternal)
- Mockito.verify(brokerToController).sendRequest(
- ArgumentMatchers.eq(requestBody),
- any(classOf[ControllerRequestCompletionHandler]))
- }
+ assertEquals(1, topicCreator.withoutPrincipalCallCount, "Should have
called createTopicWithoutPrincipal once")
- @Test
- def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
- val topicName = "topic"
-
- val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
- val serializeIsCalled = new AtomicBoolean(false)
- val principalSerde = new KafkaPrincipalSerde {
- override def serialize(principal: KafkaPrincipal): Array[Byte] = {
- assertEquals(principal, userPrincipal)
- serializeIsCalled.set(true)
- Utils.utf8(principal.toString)
- }
- override def deserialize(bytes: Array[Byte]): KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
- }
+ // Reset the topicCreator to verify the second call
+ topicCreator.reset()
+ topicCreator.setResponseForWithoutPrincipal(response)
- val requestContext = initializeRequestContext(userPrincipal,
Optional.of(principalSerde))
+ // Second call - should also trigger topicCreator because inflight is
cleared after first call completes
+ createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName,
isInternal)
- autoTopicCreationManager.createTopics(
- Set(topicName),
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA,
Some(requestContext))
+ assertEquals(1, topicCreator.withoutPrincipalCallCount, "Should have
called createTopicWithoutPrincipal once more")
- assertTrue(serializeIsCalled.get())
+ // Verify the request builder matches expected values
+ val capturedRequest = topicCreator.getWithoutPrincipalCalls.head.build()
+ assertEquals(requestTimeout, capturedRequest.data().timeoutMs())
+ assertEquals(1, capturedRequest.data().topics().size())
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
- Mockito.verify(brokerToController).sendRequest(
- argumentCaptor.capture(),
- any(classOf[ControllerRequestCompletionHandler]))
- val capturedRequest =
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
- assertEquals(userPrincipal,
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
+ // Validate request
+ val topic = capturedRequest.data().topics().iterator().next()
+ assertEquals(topicName, topic.name())
+ assertEquals(numPartitions, topic.numPartitions())
+ assertEquals(replicationFactor, topic.replicationFactor())
}
@Test
- def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit
= {
- val topicName = "topic"
-
- val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS,
Optional.empty())
-
- // Throw upon undefined principal serde when building the forward request
- assertThrows(classOf[IllegalArgumentException], () =>
autoTopicCreationManager.createTopics(
- Set(topicName),
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA,
Some(requestContext)))
- }
+ def testTopicCreationWithMetadataContext(): Unit = {
+ autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+ config,
+ groupCoordinator,
+ transactionCoordinator,
+ shareCoordinator,
+ mockTime,
+ topicCreator,
+ topicErrorCacheCapacity = testCacheCapacity)
- @Test
- def testTopicCreationWithMetadataContextNoRetryUponUnsupportedVersion():
Unit = {
val topicName = "topic"
-
+ val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
val principalSerde = new KafkaPrincipalSerde {
- override def serialize(principal: KafkaPrincipal): Array[Byte] = {
- Utils.utf8(principal.toString)
- }
+ override def serialize(principal: KafkaPrincipal): Array[Byte] =
Utils.utf8(principal.toString)
override def deserialize(bytes: Array[Byte]): KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
- val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS,
Optional.of(principalSerde))
- autoTopicCreationManager.createTopics(
- Set(topicName),
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA,
Some(requestContext))
- autoTopicCreationManager.createTopics(
- Set(topicName),
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA,
Some(requestContext))
+ val requestContext = initializeRequestContext(userPrincipal,
Optional.of(principalSerde))
+
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ val topicResult = new CreatableTopicResult()
+ .setName(topicName)
+ .setErrorCode(Errors.NONE.code())
+ createTopicsResponseData.topics().add(topicResult)
+ val response = new CreateTopicsResponse(createTopicsResponseData)
+ topicCreator.setResponseForWithPrincipal(response)
- // Should only trigger once
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Complete with unsupported version will not trigger a retry, but cleanup
the inflight topics instead
- val header = new RequestHeader(ApiKeys.ENVELOPE, 0, "client", 1)
- val response = new EnvelopeResponse(ByteBuffer.allocate(0),
Errors.UNSUPPORTED_VERSION)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, response)
-
argumentCaptor.getValue.asInstanceOf[RequestCompletionHandler].onComplete(clientResponse)
- Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Could do the send again as inflight topics are cleared.
autoTopicCreationManager.createTopics(
Set(topicName),
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA,
Some(requestContext))
- Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
+
+ assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal once")
+ val calls = topicCreator.getWithPrincipalCalls
+ assertEquals(requestContext, calls.head._1)
+
+ val capturedRequest = calls.head._2.build()
+ assertEquals(1, capturedRequest.data().topics().size())
+ assertEquals(topicName,
capturedRequest.data().topics().iterator().next().name())
}
@Test
@@ -237,35 +263,38 @@ class AutoTopicCreationManagerTest {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
+ topicCreator,
+ topicErrorCacheCapacity = testCacheCapacity
+ )
+
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ createTopicsResponseData.topics().add(
+ new CreatableTopicResult()
+ .setName("stream-topic-1")
+ .setErrorCode(Errors.NONE.code()))
+ createTopicsResponseData.topics().add(
+ new CreatableTopicResult()
+ .setName("stream-topic-2")
+ .setErrorCode(Errors.NONE.code()))
+ val response = new CreateTopicsResponse(createTopicsResponseData)
+ topicCreator.setResponseForWithPrincipal(response)
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
- Mockito.verify(brokerToController).sendRequest(
- argumentCaptor.capture(),
- any(classOf[ControllerRequestCompletionHandler]))
-
- val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
ApiKeys.CREATE_TOPICS.latestVersion(), "clientId", 0)
- val capturedRequest =
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
- val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
- topicsCollection.add(getNewTopic("stream-topic-1", 3,
2.toShort).setConfigs(topicConfig))
- topicsCollection.add(getNewTopic("stream-topic-2", 1, 1.toShort))
- val requestBody = new CreateTopicsRequest.Builder(
- new CreateTopicsRequestData()
- .setTopics(topicsCollection)
- .setTimeoutMs(requestTimeout))
- .build(ApiKeys.CREATE_TOPICS.latestVersion())
-
- val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
- assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer))
- assertEquals(requestBody.data(), CreateTopicsRequest.parse(new
ByteBufferAccessor(forwardedRequestBuffer),
- ApiKeys.CREATE_TOPICS.latestVersion()).data())
+ assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal once")
+ val calls = topicCreator.getWithPrincipalCalls
+ assertEquals(requestContext, calls.head._1)
+
+ val capturedRequest = calls.head._2.build()
+ assertEquals(requestTimeout, capturedRequest.data().timeoutMs())
+ assertEquals(2, capturedRequest.data().topics().size())
+ val topicNames =
capturedRequest.data().topics().asScala.map(_.name()).toSet
+ assertTrue(topicNames.contains("stream-topic-1"))
+ assertTrue(topicNames.contains("stream-topic-2"))
}
@Test
@@ -275,22 +304,20 @@ class AutoTopicCreationManagerTest {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
+ topicCreator,
topicErrorCacheCapacity = testCacheCapacity)
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
- Mockito.verify(brokerToController, never()).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- any(classOf[ControllerRequestCompletionHandler]))
+ assertEquals(0, topicCreator.withPrincipalCallCount, "Should not have
called createTopicWithPrincipal")
}
@Test
- def testCreateStreamsInternalTopicsPassesPrincipal(): Unit = {
+ def testCreateStreamsInternalTopicsPassesRequestContext(): Unit = {
val topics = Map(
"stream-topic-1" -> new
CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
)
@@ -298,21 +325,27 @@ class AutoTopicCreationManagerTest {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
+ topicCreator,
+ topicErrorCacheCapacity = testCacheCapacity
+ )
+
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ createTopicsResponseData.topics().add(
+ new CreatableTopicResult()
+ .setName("stream-topic-1")
+ .setErrorCode(Errors.NONE.code()))
+ val response = new CreateTopicsResponse(createTopicsResponseData)
+ topicCreator.setResponseForWithPrincipal(response)
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
- Mockito.verify(brokerToController).sendRequest(
- argumentCaptor.capture(),
- any(classOf[ControllerRequestCompletionHandler]))
- val capturedRequest =
argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
- assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"),
SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
+ assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal once")
+ val calls = topicCreator.getWithPrincipalCalls
+ assertEquals(requestContext, calls.head._1)
}
private def initializeRequestContextWithUserPrincipal(): RequestContext = {
@@ -328,23 +361,6 @@ class AutoTopicCreationManagerTest {
private def initializeRequestContext(kafkaPrincipal: KafkaPrincipal,
principalSerde:
Optional[KafkaPrincipalSerde]): RequestContext = {
-
- autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config,
- brokerToController,
- groupCoordinator,
- transactionCoordinator,
- shareCoordinator,
- mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
- val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
- .setApiKey(ApiKeys.CREATE_TOPICS.id)
- .setMinVersion(ApiKeys.CREATE_TOPICS.oldestVersion())
- .setMaxVersion(ApiKeys.CREATE_TOPICS.latestVersion())
- Mockito.when(brokerToController.controllerApiVersions())
-
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
-
val requestHeader = new RequestHeader(ApiKeys.METADATA,
ApiKeys.METADATA.latestVersion,
"clientId", 0)
new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
Optional.empty(),
@@ -367,51 +383,35 @@ class AutoTopicCreationManagerTest {
assertEquals(expectedResponses, topicResponses)
}
- private def getNewTopic(topicName: String, numPartitions: Int,
replicationFactor: Short): CreatableTopic = {
- new CreatableTopic()
- .setName(topicName)
- .setNumPartitions(numPartitions)
- .setReplicationFactor(replicationFactor)
- }
-
@Test
def testTopicCreationErrorCaching(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
+ topicCreator,
+ topicErrorCacheCapacity = testCacheCapacity
+ )
val topics = Map(
"test-topic-1" -> new
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
// Simulate a CreateTopicsResponse with errors
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ val topicResult = new CreatableTopicResult()
.setName("test-topic-1")
.setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
.setErrorMessage("Topic 'test-topic-1' already exists.")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, createTopicsResponse)
+ topicCreator.setResponseForWithPrincipal(createTopicsResponse)
- // Trigger the completion handler
- argumentCaptor.getValue.onComplete(clientResponse)
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
// Verify that the error was cached
val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic-1"),
mockTime.milliseconds())
@@ -424,45 +424,38 @@ class AutoTopicCreationManagerTest {
def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
+ topicCreator,
+ topicErrorCacheCapacity = testCacheCapacity
+ )
val topics = Map(
"success-topic" -> new
CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
"failed-topic" -> new
CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
// Simulate mixed response - one success, one failure
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
+ val createTopicsResponseData = new CreateTopicsResponseData()
createTopicsResponseData.topics().add(
- new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ new CreatableTopicResult()
.setName("success-topic")
.setErrorCode(Errors.NONE.code())
)
createTopicsResponseData.topics().add(
- new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ new CreatableTopicResult()
.setName("failed-topic")
.setErrorCode(Errors.POLICY_VIOLATION.code())
.setErrorMessage("Policy violation")
)
val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, createTopicsResponse)
+ topicCreator.setResponseForWithPrincipal(createTopicsResponse)
- argumentCaptor.getValue.onComplete(clientResponse)
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
// Only the failed topic should be cached
val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("success-topic",
"failed-topic", "nonexistent-topic"), mockTime.milliseconds())
@@ -475,13 +468,13 @@ class AutoTopicCreationManagerTest {
def testErrorCacheTTL(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
+ topicCreator,
+ topicErrorCacheCapacity = testCacheCapacity
+ )
// First cache an error by simulating topic creation failure
val topics = Map(
@@ -489,28 +482,19 @@ class AutoTopicCreationManagerTest {
)
val requestContext = initializeRequestContextWithUserPrincipal()
val shortTtlMs = 1000L // Use 1 second TTL for faster testing
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
// Simulate a CreateTopicsResponse with error
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ val topicResult = new CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, createTopicsResponse)
+ topicCreator.setResponseForWithPrincipal(createTopicsResponse)
- // Cache the error at T0
- argumentCaptor.getValue.onComplete(clientResponse)
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
// Verify error is cached and accessible within TTL
val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
@@ -525,334 +509,16 @@ class AutoTopicCreationManagerTest {
assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively
cleaned up")
}
- @Test
- def testEnvelopeResponseSuccessfulParsing(): Unit = {
- autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config,
- brokerToController,
- groupCoordinator,
- transactionCoordinator,
- shareCoordinator,
- mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
- val topics = Map(
- "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
- )
- val requestContext = initializeRequestContextWithUserPrincipal()
- val timeoutMs = 5000L
-
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Create a successful CreateTopicsResponse
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
- .setName("test-topic")
- .setErrorCode(Errors.NONE.code())
- .setNumPartitions(1)
- .setReplicationFactor(1.toShort)
- createTopicsResponseData.topics().add(topicResult)
-
- val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
- val correlationId = requestContext.correlationId // Use the actual
correlation ID from request context
- val clientId = requestContext.clientId
-
- // Serialize the CreateTopicsResponse with header as it would appear in an
envelope
- val responseHeader = new ResponseHeader(correlationId,
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
- val serializedResponse = RequestUtils.serialize(responseHeader.data(),
responseHeader.headerVersion(),
-
createTopicsResponse.data(), requestVersion)
-
- // Create an EnvelopeResponse containing the serialized
CreateTopicsResponse
- val envelopeResponse = new EnvelopeResponse(serializedResponse,
Errors.NONE)
- val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId,
correlationId)
- val clientResponse = new ClientResponse(requestHeader, null, null,
- 0, 0, false, null, null, envelopeResponse)
-
- // Trigger the completion handler
- argumentCaptor.getValue.onComplete(clientResponse)
-
- // Verify no errors were cached (successful response)
- val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
- assertTrue(cachedErrors.isEmpty, "No errors should be cached for
successful response")
- }
-
- @Test
- def testEnvelopeResponseWithEnvelopeError(): Unit = {
- autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config,
- brokerToController,
- groupCoordinator,
- transactionCoordinator,
- shareCoordinator,
- mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
- val topics = Map(
- "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
- )
- val requestContext = initializeRequestContextWithUserPrincipal()
- val timeoutMs = 5000L
-
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Create an EnvelopeResponse with an envelope-level error
- val envelopeResponse = new EnvelopeResponse(ByteBuffer.allocate(0),
Errors.UNSUPPORTED_VERSION)
- val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0,
requestContext.clientId, requestContext.correlationId)
- val clientResponse = new ClientResponse(requestHeader, null, null,
- 0, 0, false, null, null, envelopeResponse)
-
- // Trigger the completion handler
- argumentCaptor.getValue.onComplete(clientResponse)
-
- // Verify the envelope error was cached
- val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
- assertEquals(1, cachedErrors.size)
- assertTrue(cachedErrors("test-topic").contains("Envelope error:
UNSUPPORTED_VERSION"))
- }
-
- @Test
- def testEnvelopeResponseParsingException(): Unit = {
- autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config,
- brokerToController,
- groupCoordinator,
- transactionCoordinator,
- shareCoordinator,
- mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
- val topics = Map(
- "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
- )
- val requestContext = initializeRequestContextWithUserPrincipal()
- val timeoutMs = 5000L
-
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Create an EnvelopeResponse with malformed response data that will cause
parsing to fail
- val malformedData = ByteBuffer.wrap("invalid response data".getBytes())
- val envelopeResponse = new EnvelopeResponse(malformedData, Errors.NONE)
- val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0,
requestContext.clientId, requestContext.correlationId)
- val clientResponse = new ClientResponse(requestHeader, null, null,
- 0, 0, false, null, null, envelopeResponse)
-
- // Trigger the completion handler
- argumentCaptor.getValue.onComplete(clientResponse)
-
- // Verify the parsing error was cached
- val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
- assertEquals(1, cachedErrors.size)
- assertTrue(cachedErrors("test-topic").contains("Response parsing error:"))
- }
-
- @Test
- def testEnvelopeResponseCorrelationIdMismatch(): Unit = {
- autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config,
- brokerToController,
- groupCoordinator,
- transactionCoordinator,
- shareCoordinator,
- mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
- val topics = Map(
- "test-topic" -> new
CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
- )
- val requestContext = initializeRequestContextWithUserPrincipal()
- val timeoutMs = 5000L
-
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Create a CreateTopicsResponse with a different correlation ID than the
request
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
- .setName("test-topic")
- .setErrorCode(Errors.NONE.code())
- createTopicsResponseData.topics().add(topicResult)
-
- val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
- val requestCorrelationId = 123
- val responseCorrelationId = 456 // Different correlation ID
- val clientId = "test-client"
-
- // Serialize the CreateTopicsResponse with mismatched correlation ID
- val responseHeader = new ResponseHeader(responseCorrelationId,
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
- val serializedResponse = RequestUtils.serialize(responseHeader.data(),
responseHeader.headerVersion(),
-
createTopicsResponse.data(), requestVersion)
-
- // Create an EnvelopeResponse containing the serialized
CreateTopicsResponse
- val envelopeResponse = new EnvelopeResponse(serializedResponse,
Errors.NONE)
- val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId,
requestCorrelationId)
- val clientResponse = new ClientResponse(requestHeader, null, null,
- 0, 0, false, null, null, envelopeResponse)
-
- // Trigger the completion handler
- argumentCaptor.getValue.onComplete(clientResponse)
-
- // Verify the correlation ID mismatch error was cached
- val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
- assertEquals(1, cachedErrors.size)
- assertTrue(cachedErrors("test-topic").contains("Response parsing error:"))
- }
-
- @Test
- def testEnvelopeResponseWithTopicErrors(): Unit = {
- autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config,
- brokerToController,
- groupCoordinator,
- transactionCoordinator,
- shareCoordinator,
- mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
- val topics = Map(
- "test-topic-1" -> new
CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1),
- "test-topic-2" -> new
CreatableTopic().setName("test-topic-2").setNumPartitions(1).setReplicationFactor(1)
- )
- val requestContext = initializeRequestContextWithUserPrincipal()
- val timeoutMs = 5000L
-
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Create a CreateTopicsResponse with mixed success and error results
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
-
- // Successful topic
- val successResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
- .setName("test-topic-1")
- .setErrorCode(Errors.NONE.code())
- .setNumPartitions(1)
- .setReplicationFactor(1.toShort)
- createTopicsResponseData.topics().add(successResult)
-
- // Failed topic
- val errorResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
- .setName("test-topic-2")
- .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
- .setErrorMessage("Topic already exists")
- createTopicsResponseData.topics().add(errorResult)
-
- val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
- val correlationId = requestContext.correlationId // Use the actual
correlation ID from request context
- val clientId = requestContext.clientId
-
- // Serialize the CreateTopicsResponse with header
- val responseHeader = new ResponseHeader(correlationId,
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
- val serializedResponse = RequestUtils.serialize(responseHeader.data(),
responseHeader.headerVersion(),
-
createTopicsResponse.data(), requestVersion)
-
- // Create an EnvelopeResponse containing the serialized
CreateTopicsResponse
- val envelopeResponse = new EnvelopeResponse(serializedResponse,
Errors.NONE)
- val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId,
correlationId)
- val clientResponse = new ClientResponse(requestHeader, null, null,
- 0, 0, false, null, null, envelopeResponse)
-
- // Trigger the completion handler
- argumentCaptor.getValue.onComplete(clientResponse)
-
- // Verify only the failed topic was cached
- val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(
- Set("test-topic-1", "test-topic-2"), mockTime.milliseconds())
-
- assertEquals(1, cachedErrors.size, s"Expected only 1 error but found:
$cachedErrors")
- assertTrue(cachedErrors.contains("test-topic-2"))
- assertEquals("Topic already exists", cachedErrors("test-topic-2"))
- }
-
- @Test
- def testSendCreateTopicRequestEnvelopeHandling(): Unit = {
- // Test the sendCreateTopicRequest method (without error caching) handles
envelopes correctly
- autoTopicCreationManager = new DefaultAutoTopicCreationManager(
- config,
- brokerToController,
- groupCoordinator,
- transactionCoordinator,
- shareCoordinator,
- mockTime,
- topicErrorCacheCapacity = testCacheCapacity)
-
- val requestContext = initializeRequestContextWithUserPrincipal()
-
- // Call createTopics which uses sendCreateTopicRequest internally
- autoTopicCreationManager.createTopics(
- Set("test-topic"),
ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA,
Some(requestContext))
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
- // Create a CreateTopicsResponse with an error
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
- .setName("test-topic")
- .setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code())
- .setErrorMessage("Invalid topic name")
- createTopicsResponseData.topics().add(topicResult)
-
- val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
- val correlationId = requestContext.correlationId // Use the actual
correlation ID from request context
- val clientId = requestContext.clientId
-
- // Serialize the CreateTopicsResponse with header
- val responseHeader = new ResponseHeader(correlationId,
ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
- val serializedResponse = RequestUtils.serialize(responseHeader.data(),
responseHeader.headerVersion(),
-
createTopicsResponse.data(), requestVersion)
-
- // Create an EnvelopeResponse containing the serialized
CreateTopicsResponse
- val envelopeResponse = new EnvelopeResponse(serializedResponse,
Errors.NONE)
- val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId,
correlationId)
- val clientResponse = new ClientResponse(requestHeader, null, null,
- 0, 0, false, null, null, envelopeResponse)
-
- // Trigger the completion handler
- argumentCaptor.getValue.onComplete(clientResponse)
-
- // For sendCreateTopicRequest, errors are not cached, but we can verify
the handler completed without exception
- // The test passes if no exception is thrown during envelope processing
- }
-
@Test
def testErrorCacheExpirationBasedEviction(): Unit = {
// Create manager with small cache size for testing
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
+ topicCreator,
topicErrorCacheCapacity = 3)
val requestContext = initializeRequestContextWithUserPrincipal()
@@ -866,31 +532,21 @@ class AutoTopicCreationManagerTest {
topicName -> new
CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)
)
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController, Mockito.atLeastOnce()).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
// Simulate error response for this topic
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ val topicResult = new CreatableTopicResult()
.setName(topicName)
.setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
.setErrorMessage(s"Topic '$topicName' already exists.")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, createTopicsResponse)
-
- argumentCaptor.getValue.onComplete(clientResponse)
+ topicCreator.setResponseForWithPrincipal(createTopicsResponse)
+
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
* 2)
// Advance time slightly between additions to ensure different timestamps
mockTime.sleep(10)
-
}
// With cache size of 3, topics 1 and 2 should have been evicted
@@ -909,11 +565,11 @@ class AutoTopicCreationManagerTest {
def testTopicsInBackoffAreNotRetried(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
+ topicCreator,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
@@ -922,28 +578,19 @@ class AutoTopicCreationManagerTest {
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
- // First attempt - trigger topic creation
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
// Simulate error response to cache the error
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ val topicResult = new CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, createTopicsResponse)
+ topicCreator.setResponseForWithPrincipal(createTopicsResponse)
- argumentCaptor.getValue.onComplete(clientResponse)
+ // First attempt - trigger topic creation
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
// Verify error is cached
val cachedErrors =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
@@ -953,20 +600,18 @@ class AutoTopicCreationManagerTest {
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
// Verify still only one request was sent (not retried during back-off)
- Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- any(classOf[ControllerRequestCompletionHandler]))
+ assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal once")
}
@Test
def testTopicsOutOfBackoffCanBeRetried(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
+ topicCreator,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
@@ -975,28 +620,19 @@ class AutoTopicCreationManagerTest {
val requestContext = initializeRequestContextWithUserPrincipal()
val shortTtlMs = 1000L
- // First attempt - trigger topic creation
- autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
-
- val argumentCaptor =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor.capture())
-
// Simulate error response to cache the error
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ val createTopicsResponseData = new CreateTopicsResponseData()
+ val topicResult = new CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, createTopicsResponse)
+ topicCreator.setResponseForWithPrincipal(createTopicsResponse)
- argumentCaptor.getValue.onComplete(clientResponse)
+ // First attempt - trigger topic creation
+ autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
// Verify error is cached
val cachedErrors1 =
autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"),
mockTime.milliseconds())
@@ -1013,20 +649,18 @@ class AutoTopicCreationManagerTest {
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, shortTtlMs)
// Verify a second request was sent (retry allowed after back-off expires)
- Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- any(classOf[ControllerRequestCompletionHandler]))
+ assertEquals(2, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal twice")
}
@Test
def testInflightTopicsAreNotRetriedConcurrently(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
+ topicCreator,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
@@ -1035,31 +669,31 @@ class AutoTopicCreationManagerTest {
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
+ // Use a future that doesn't complete immediately to simulate in-flight
state
+ val future = new CompletableFuture[CreateTopicsResponse]()
+ topicCreator.setFutureForWithPrincipal(future)
+
// First call - should send request and mark topic as in-flight
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
- Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- any(classOf[ControllerRequestCompletionHandler]))
+ assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal once")
// Second concurrent call - should NOT send request because topic is
in-flight
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
// Verify still only one request was sent (concurrent request blocked)
- Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- any(classOf[ControllerRequestCompletionHandler]))
+ assertEquals(1, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal once")
}
@Test
def testBackoffAndInflightInteraction(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
- brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
+ topicCreator,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
@@ -1070,56 +704,45 @@ class AutoTopicCreationManagerTest {
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
- // Create error for backoff-topic
- val backoffOnly = Map("backoff-topic" -> topics("backoff-topic"))
- autoTopicCreationManager.createStreamsInternalTopics(backoffOnly,
requestContext, timeoutMs)
-
- val argumentCaptor1 =
ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
- Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- argumentCaptor1.capture())
-
// Simulate error response for backoff-topic
- val createTopicsResponseData = new
org.apache.kafka.common.message.CreateTopicsResponseData()
- val topicResult = new
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
+ val backoffResponseData = new CreateTopicsResponseData()
+ val backoffResult = new CreatableTopicResult()
.setName("backoff-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
- createTopicsResponseData.topics().add(topicResult)
+ backoffResponseData.topics().add(backoffResult)
+ val backoffResponse = new CreateTopicsResponse(backoffResponseData)
+ topicCreator.setResponseForWithPrincipal(backoffResponse)
- val createTopicsResponse = new
CreateTopicsResponse(createTopicsResponseData)
- val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
- val clientResponse = new ClientResponse(header, null, null,
- 0, 0, false, null, null, createTopicsResponse)
-
- argumentCaptor1.getValue.onComplete(clientResponse)
+ // Create error for backoff-topic
+ val backoffOnly = Map("backoff-topic" -> topics("backoff-topic"))
+ autoTopicCreationManager.createStreamsInternalTopics(backoffOnly,
requestContext, timeoutMs)
// Make inflight-topic in-flight (without completing the request)
+ val inflightFuture = new CompletableFuture[CreateTopicsResponse]()
+ topicCreator.setFutureForWithPrincipal(inflightFuture)
+
val inflightOnly = Map("inflight-topic" -> topics("inflight-topic"))
autoTopicCreationManager.createStreamsInternalTopics(inflightOnly,
requestContext, timeoutMs)
- Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
- any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
- any(classOf[ControllerRequestCompletionHandler]))
-
// Now attempt to create all three topics together
+ val normalResponseData = new CreateTopicsResponseData()
+ val normalResult = new CreatableTopicResult()
+ .setName("normal-topic")
+ .setErrorCode(Errors.NONE.code())
+ normalResponseData.topics().add(normalResult)
+ val normalResponse = new CreateTopicsResponse(normalResponseData)
+ topicCreator.setResponseForWithPrincipal(normalResponse)
+
autoTopicCreationManager.createStreamsInternalTopics(topics,
requestContext, timeoutMs)
- val argumentCaptor2 =
ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
// Total 3 requests: 1 for backoff-topic, 1 for inflight-topic, 1 for
normal-topic only
- Mockito.verify(brokerToController, Mockito.times(3)).sendRequest(
- argumentCaptor2.capture(),
- any(classOf[ControllerRequestCompletionHandler]))
+ assertEquals(3, topicCreator.withPrincipalCallCount, "Should have called
createTopicWithPrincipal 3 times")
// Verify that only normal-topic was included in the last request
- val lastRequest =
argumentCaptor2.getValue.asInstanceOf[EnvelopeRequest.Builder]
- .build(ApiKeys.ENVELOPE.latestVersion())
- val forwardedRequestBuffer = lastRequest.requestData().duplicate()
- val requestHeader = RequestHeader.parse(forwardedRequestBuffer)
- val parsedRequest = CreateTopicsRequest.parse(new
org.apache.kafka.common.protocol.ByteBufferAccessor(forwardedRequestBuffer),
- requestHeader.apiVersion())
-
- val topicNames = parsedRequest.data().topics().asScala.map(_.name()).toSet
+ val calls = topicCreator.getWithPrincipalCalls
+ val lastRequest = calls(2)._2.build()
+ val topicNames = lastRequest.data().topics().asScala.map(_.name()).toSet
assertEquals(1, topicNames.size, "Only normal-topic should be created")
assertTrue(topicNames.contains("normal-topic"), "normal-topic should be in
the request")
assertTrue(!topicNames.contains("backoff-topic"), "backoff-topic should be
filtered (in back-off)")