jsancio commented on code in PR #12885:
URL: https://github.com/apache/kafka/pull/12885#discussion_r1035322059
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1925,9 +1925,18 @@ class KafkaApis(val requestChannel: RequestChannel,
val hasClusterAuthorization = authHelper.authorize(request.context,
CREATE, CLUSTER, CLUSTER_NAME,
logIfDenied = false)
val topics = createTopicsRequest.data.topics.asScala.map(_.name)
- val authorizedTopics =
- if (hasClusterAuthorization) topics.toSet
- else authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
topics)(identity)
+ val authorizedTopics = if (hasClusterAuthorization) {
+ /* The cluster metatdata topic is an internal topic with a different
implementation. The user should not be
+ * allowed to create it as a regular topic.
+ */
+ topics.toSet.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
Review Comment:
> Also, should we log a warning if we find it (and remove it)?
I don't think it should be warning. The code is behaving as expected. We can
log INFO, DEBUG or TRACE.
The user is trying to create a topic that the cluster doesn't allow. We can
either return a better `errorMessage` in the response or log an info message.
I'll change the code log an info message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]