This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 0a7ea3f7dc4 KAFKA-14358; Disallow creation of cluster metadata topic 
(#12885)
0a7ea3f7dc4 is described below

commit 0a7ea3f7dc4c88c57f185688086ffacb3e2443c5
Author: José Armando García Sancio <[email protected]>
AuthorDate: Thu Dec 1 18:34:29 2022 -0800

    KAFKA-14358; Disallow creation of cluster metadata topic (#12885)
    
    With KRaft the cluster metadata topic (__cluster_metadata) has a different 
implementation compared to regular topic. The user should not be allowed to 
create this topic. This can cause issues if the metadata log dir is the same as 
one of the log dirs.
    
    This change returns an authorization error if the user tries to create the 
cluster metadata topic.
    
    Reviewers: David Arthur <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 14 ++++-----
 .../org/apache/kafka/common/internals/Topic.java   |  7 +++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  4 +--
 .../main/scala/kafka/server/ControllerApis.scala   | 19 +++++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   | 36 ++++++++++++++++++----
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  4 +--
 .../server/AbstractCreateTopicsRequestTest.scala   |  4 +--
 .../unit/kafka/server/ControllerApisTest.scala     |  3 +-
 .../kafka/server/CreateTopicsRequestTest.scala     | 11 ++++++-
 9 files changed, 75 insertions(+), 27 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1b837fda223..89bc011a4d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -274,8 +274,8 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_NAME;
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_PARTITION;
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_NAME;
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
@@ -4377,7 +4377,7 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
                 return new Builder(DescribeQuorumRequest.singletonRequest(
-                        new TopicPartition(METADATA_TOPIC_NAME, 
METADATA_TOPIC_PARTITION.partition())));
+                        new TopicPartition(CLUSTER_METADATA_TOPIC_NAME, 
CLUSTER_METADATA_TOPIC_PARTITION.partition())));
             }
 
             @Override
@@ -4393,9 +4393,9 @@ public class KafkaAdminClient extends AdminClient {
                     throw new UnknownServerException(msg);
                 }
                 DescribeQuorumResponseData.TopicData topic = 
quorumResponse.data().topics().get(0);
-                if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+                if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) {
                     String msg = String.format("DescribeMetadataQuorum 
received a topic with name %s when %s was expected",
-                            topic.topicName(), METADATA_TOPIC_NAME);
+                            topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
@@ -4406,9 +4406,9 @@ public class KafkaAdminClient extends AdminClient {
                     throw new UnknownServerException(msg);
                 }
                 DescribeQuorumResponseData.PartitionData partition = 
topic.partitions().get(0);
-                if (partition.partitionIndex() != 
METADATA_TOPIC_PARTITION.partition()) {
+                if (partition.partitionIndex() != 
CLUSTER_METADATA_TOPIC_PARTITION.partition()) {
                     String msg = String.format("DescribeMetadataQuorum 
received a single partition with index %d when %d was expected",
-                            partition.partitionIndex(), 
METADATA_TOPIC_PARTITION.partition());
+                            partition.partitionIndex(), 
CLUSTER_METADATA_TOPIC_PARTITION.partition());
                     log.debug(msg);
                     throw new UnknownServerException(msg);
                 }
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java 
b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index 92952a2c031..fe4a01c52a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -28,8 +28,11 @@ public class Topic {
 
     public static final String GROUP_METADATA_TOPIC_NAME = 
"__consumer_offsets";
     public static final String TRANSACTION_STATE_TOPIC_NAME = 
"__transaction_state";
-    public static final String METADATA_TOPIC_NAME = "__cluster_metadata";
-    public static final TopicPartition METADATA_TOPIC_PARTITION = new 
TopicPartition(METADATA_TOPIC_NAME, 0);
+    public static final String CLUSTER_METADATA_TOPIC_NAME = 
"__cluster_metadata";
+    public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new 
TopicPartition(
+        CLUSTER_METADATA_TOPIC_NAME,
+        0
+    );
     public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
 
     private static final Set<String> INTERNAL_TOPICS = 
Collections.unmodifiableSet(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 471551255e2..652c29a6e4b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -625,8 +625,8 @@ public class KafkaAdminClientTest {
             Boolean partitionCountError,
             Boolean partitionIndexError,
             Boolean emptyOptionals) {
-        String topicName = topicNameError ? "RANDOM" : 
Topic.METADATA_TOPIC_NAME;
-        Integer partitionIndex = partitionIndexError ? 1 : 
Topic.METADATA_TOPIC_PARTITION.partition();
+        String topicName = topicNameError ? "RANDOM" : 
Topic.CLUSTER_METADATA_TOPIC_NAME;
+        Integer partitionIndex = partitionIndexError ? 1 : 
Topic.CLUSTER_METADATA_TOPIC_PARTITION.partition();
         List<DescribeQuorumResponseData.TopicData> topics = new ArrayList<>();
         List<DescribeQuorumResponseData.PartitionData> partitions = new 
ArrayList<>();
         for (int i = 0; i < (partitionCountError ? 2 : 1); i++) {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 235660cec0a..657c2965533 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.acl.AclOperation.{ALTER, 
ALTER_CONFIGS, CLUSTER_A
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{ApiException, 
ClusterAuthorizationException, InvalidRequestException, 
TopicDeletionDisabledException}
 import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
 => OldAlterConfigsResourceResponse}
 import 
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
 import 
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
@@ -374,12 +375,21 @@ class ControllerApis(val requestChannel: RequestChannel,
         }
       }
     }
+
+    /* The cluster metatdata topic is an internal topic with a different 
implementation. The user should not be
+     * allowed to create it as a regular topic.
+     */
+    if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
+      info(s"Rejecting creation of internal topic 
${Topic.CLUSTER_METADATA_TOPIC_NAME}")
+    }
+    val allowedTopicNames = 
topicNames.asScala.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
+
     val authorizedTopicNames = if (hasClusterAuth) {
-      topicNames.asScala
+      allowedTopicNames
     } else {
-      getCreatableTopics.apply(topicNames.asScala)
+      getCreatableTopics.apply(allowedTopicNames)
     }
-    val describableTopicNames = 
getDescribableTopics.apply(topicNames.asScala).asJava
+    val describableTopicNames = 
getDescribableTopics.apply(allowedTopicNames).asJava
     val effectiveRequest = request.duplicate()
     val iterator = effectiveRequest.topics().iterator()
     while (iterator.hasNext) {
@@ -400,7 +410,8 @@ class ControllerApis(val requestChannel: RequestChannel,
         if (!authorizedTopicNames.contains(name)) {
           response.topics().add(new CreatableTopicResult().
             setName(name).
-            setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
+            setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+            setErrorMessage("Authorization failed."))
         }
       }
       response
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 546361f352b..07e33112bcf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1925,12 +1925,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       val hasClusterAuthorization = authHelper.authorize(request.context, 
CREATE, CLUSTER, CLUSTER_NAME,
         logIfDenied = false)
-      val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-      val authorizedTopics =
-        if (hasClusterAuthorization) topics.toSet
-        else authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-      val authorizedForDescribeConfigs = 
authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
-        topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+
+      val allowedTopicNames = {
+        val topicNames = createTopicsRequest
+          .data
+          .topics
+          .asScala
+          .map(_.name)
+          .toSet
+
+          /* The cluster metatdata topic is an internal topic with a different 
implementation. The user should not be
+           * allowed to create it as a regular topic.
+           */
+          if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
+            info(s"Rejecting creation of internal topic 
${Topic.CLUSTER_METADATA_TOPIC_NAME}")
+          }
+          topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
+      }
+
+      val authorizedTopics = if (hasClusterAuthorization) {
+        allowedTopicNames.toSet
+      } else {
+        authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
allowedTopicNames)(identity)
+      }
+      val authorizedForDescribeConfigs = authHelper.filterByAuthorized(
+        request.context,
+        DESCRIBE_CONFIGS,
+        TOPIC,
+        allowedTopicNames,
+        logIfDenied = false
+      )(identity).map(name => name -> results.find(name)).toMap
 
       results.forEach { topic =>
         if (results.findAll(topic.name).size > 1) {
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 84ea10da57f..76a874b2197 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -161,8 +161,8 @@ class KafkaRaftServer(
 }
 
 object KafkaRaftServer {
-  val MetadataTopic = Topic.METADATA_TOPIC_NAME
-  val MetadataPartition = Topic.METADATA_TOPIC_PARTITION
+  val MetadataTopic = Topic.CLUSTER_METADATA_TOPIC_NAME
+  val MetadataPartition = Topic.CLUSTER_METADATA_TOPIC_PARTITION
   val MetadataTopicId = Uuid.METADATA_TOPIC_ID
 
   sealed trait ProcessRole
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index ecee2cd19c4..627a939ddcf 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -163,9 +163,9 @@ abstract class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
       if (actual == null) {
         throw new RuntimeException(s"No response data found for topic 
$topicName")
       }
-      assertEquals(expected.error.code(), actual.errorCode(), "The response 
error should match")
+      assertEquals(expected.error.code(), actual.errorCode(), "The response 
error code should match")
       if (checkErrorMessage) {
-        assertEquals(expected.message, actual.errorMessage())
+        assertEquals(expected.message, actual.errorMessage(), "The response 
error message should match")
       }
       // If no error validate topic exists
       if (expectedError.isSuccess && !request.data.validateOnly) {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 05bd13d795d..969d57c0dd2 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -554,7 +554,8 @@ class ControllerApisTest {
         setTopicId(new Uuid(0L, 2L)).
         setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
       new CreatableTopicResult().setName("quux").
-        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+        setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+        setErrorMessage("Authorization failed."))
     assertEquals(expectedResponse, 
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
       false,
       _ => Set("baz", "indescribable"),
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 57834234cc1..a193db284c4 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import kafka.utils._
 import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
 import org.apache.kafka.common.protocol.ApiKeys
@@ -27,7 +28,6 @@ import org.apache.kafka.common.requests.CreateTopicsRequest
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
-
 import scala.jdk.CollectionConverters._
 
 class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@@ -197,4 +197,13 @@ class CreateTopicsRequestTest extends 
AbstractCreateTopicsRequestTest {
         assertEquals(Uuid.ZERO_UUID, topicResponse.topicId())
     }
   }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateClusterMetadataTopic(quorum: String): Unit = {
+    validateErrorCreateTopicsRequests(
+      topicsReq(Seq(topicReq(Topic.CLUSTER_METADATA_TOPIC_NAME))),
+      Map(Topic.CLUSTER_METADATA_TOPIC_NAME -> 
error(Errors.TOPIC_AUTHORIZATION_FAILED, Some("Authorization failed.")))
+    )
+  }
 }

Reply via email to