This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 0a7ea3f7dc4 KAFKA-14358; Disallow creation of cluster metadata topic
(#12885)
0a7ea3f7dc4 is described below
commit 0a7ea3f7dc4c88c57f185688086ffacb3e2443c5
Author: José Armando García Sancio <[email protected]>
AuthorDate: Thu Dec 1 18:34:29 2022 -0800
KAFKA-14358; Disallow creation of cluster metadata topic (#12885)
With KRaft the cluster metadata topic (__cluster_metadata) has a different
implementation compared to regular topic. The user should not be allowed to
create this topic. This can cause issues if the metadata log dir is the same as
one of the log dirs.
This change returns an authorization error if the user tries to create the
cluster metadata topic.
Reviewers: David Arthur <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 14 ++++-----
.../org/apache/kafka/common/internals/Topic.java | 7 +++--
.../kafka/clients/admin/KafkaAdminClientTest.java | 4 +--
.../main/scala/kafka/server/ControllerApis.scala | 19 +++++++++---
core/src/main/scala/kafka/server/KafkaApis.scala | 36 ++++++++++++++++++----
.../main/scala/kafka/server/KafkaRaftServer.scala | 4 +--
.../server/AbstractCreateTopicsRequestTest.scala | 4 +--
.../unit/kafka/server/ControllerApisTest.scala | 3 +-
.../kafka/server/CreateTopicsRequestTest.scala | 11 ++++++-
9 files changed, 75 insertions(+), 27 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1b837fda223..89bc011a4d5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -274,8 +274,8 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_NAME;
-import static org.apache.kafka.common.internals.Topic.METADATA_TOPIC_PARTITION;
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_NAME;
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
import static
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignablePartition;
import static
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import static
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
@@ -4377,7 +4377,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
DescribeQuorumRequest.Builder createRequest(int timeoutMs) {
return new Builder(DescribeQuorumRequest.singletonRequest(
- new TopicPartition(METADATA_TOPIC_NAME,
METADATA_TOPIC_PARTITION.partition())));
+ new TopicPartition(CLUSTER_METADATA_TOPIC_NAME,
CLUSTER_METADATA_TOPIC_PARTITION.partition())));
}
@Override
@@ -4393,9 +4393,9 @@ public class KafkaAdminClient extends AdminClient {
throw new UnknownServerException(msg);
}
DescribeQuorumResponseData.TopicData topic =
quorumResponse.data().topics().get(0);
- if (!topic.topicName().equals(METADATA_TOPIC_NAME)) {
+ if (!topic.topicName().equals(CLUSTER_METADATA_TOPIC_NAME)) {
String msg = String.format("DescribeMetadataQuorum
received a topic with name %s when %s was expected",
- topic.topicName(), METADATA_TOPIC_NAME);
+ topic.topicName(), CLUSTER_METADATA_TOPIC_NAME);
log.debug(msg);
throw new UnknownServerException(msg);
}
@@ -4406,9 +4406,9 @@ public class KafkaAdminClient extends AdminClient {
throw new UnknownServerException(msg);
}
DescribeQuorumResponseData.PartitionData partition =
topic.partitions().get(0);
- if (partition.partitionIndex() !=
METADATA_TOPIC_PARTITION.partition()) {
+ if (partition.partitionIndex() !=
CLUSTER_METADATA_TOPIC_PARTITION.partition()) {
String msg = String.format("DescribeMetadataQuorum
received a single partition with index %d when %d was expected",
- partition.partitionIndex(),
METADATA_TOPIC_PARTITION.partition());
+ partition.partitionIndex(),
CLUSTER_METADATA_TOPIC_PARTITION.partition());
log.debug(msg);
throw new UnknownServerException(msg);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index 92952a2c031..fe4a01c52a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -28,8 +28,11 @@ public class Topic {
public static final String GROUP_METADATA_TOPIC_NAME =
"__consumer_offsets";
public static final String TRANSACTION_STATE_TOPIC_NAME =
"__transaction_state";
- public static final String METADATA_TOPIC_NAME = "__cluster_metadata";
- public static final TopicPartition METADATA_TOPIC_PARTITION = new
TopicPartition(METADATA_TOPIC_NAME, 0);
+ public static final String CLUSTER_METADATA_TOPIC_NAME =
"__cluster_metadata";
+ public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new
TopicPartition(
+ CLUSTER_METADATA_TOPIC_NAME,
+ 0
+ );
public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
private static final Set<String> INTERNAL_TOPICS =
Collections.unmodifiableSet(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 471551255e2..652c29a6e4b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -625,8 +625,8 @@ public class KafkaAdminClientTest {
Boolean partitionCountError,
Boolean partitionIndexError,
Boolean emptyOptionals) {
- String topicName = topicNameError ? "RANDOM" :
Topic.METADATA_TOPIC_NAME;
- Integer partitionIndex = partitionIndexError ? 1 :
Topic.METADATA_TOPIC_PARTITION.partition();
+ String topicName = topicNameError ? "RANDOM" :
Topic.CLUSTER_METADATA_TOPIC_NAME;
+ Integer partitionIndex = partitionIndexError ? 1 :
Topic.CLUSTER_METADATA_TOPIC_PARTITION.partition();
List<DescribeQuorumResponseData.TopicData> topics = new ArrayList<>();
List<DescribeQuorumResponseData.PartitionData> partitions = new
ArrayList<>();
for (int i = 0; i < (partitionCountError ? 2 : 1); i++) {
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 235660cec0a..657c2965533 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.acl.AclOperation.{ALTER,
ALTER_CONFIGS, CLUSTER_A
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException,
ClusterAuthorizationException, InvalidRequestException,
TopicDeletionDisabledException}
import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
=> OldAlterConfigsResourceResponse}
import
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
@@ -374,12 +375,21 @@ class ControllerApis(val requestChannel: RequestChannel,
}
}
}
+
+ /* The cluster metatdata topic is an internal topic with a different
implementation. The user should not be
+ * allowed to create it as a regular topic.
+ */
+ if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
+ info(s"Rejecting creation of internal topic
${Topic.CLUSTER_METADATA_TOPIC_NAME}")
+ }
+ val allowedTopicNames =
topicNames.asScala.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
+
val authorizedTopicNames = if (hasClusterAuth) {
- topicNames.asScala
+ allowedTopicNames
} else {
- getCreatableTopics.apply(topicNames.asScala)
+ getCreatableTopics.apply(allowedTopicNames)
}
- val describableTopicNames =
getDescribableTopics.apply(topicNames.asScala).asJava
+ val describableTopicNames =
getDescribableTopics.apply(allowedTopicNames).asJava
val effectiveRequest = request.duplicate()
val iterator = effectiveRequest.topics().iterator()
while (iterator.hasNext) {
@@ -400,7 +410,8 @@ class ControllerApis(val requestChannel: RequestChannel,
if (!authorizedTopicNames.contains(name)) {
response.topics().add(new CreatableTopicResult().
setName(name).
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code))
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
+ setErrorMessage("Authorization failed."))
}
}
response
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 546361f352b..07e33112bcf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1925,12 +1925,36 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val hasClusterAuthorization = authHelper.authorize(request.context,
CREATE, CLUSTER, CLUSTER_NAME,
logIfDenied = false)
- val topics = createTopicsRequest.data.topics.asScala.map(_.name)
- val authorizedTopics =
- if (hasClusterAuthorization) topics.toSet
- else authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
topics)(identity)
- val authorizedForDescribeConfigs =
authHelper.filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC,
- topics, logIfDenied = false)(identity).map(name => name ->
results.find(name)).toMap
+
+ val allowedTopicNames = {
+ val topicNames = createTopicsRequest
+ .data
+ .topics
+ .asScala
+ .map(_.name)
+ .toSet
+
+ /* The cluster metatdata topic is an internal topic with a different
implementation. The user should not be
+ * allowed to create it as a regular topic.
+ */
+ if (topicNames.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)) {
+ info(s"Rejecting creation of internal topic
${Topic.CLUSTER_METADATA_TOPIC_NAME}")
+ }
+ topicNames.diff(Set(Topic.CLUSTER_METADATA_TOPIC_NAME))
+ }
+
+ val authorizedTopics = if (hasClusterAuthorization) {
+ allowedTopicNames.toSet
+ } else {
+ authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
allowedTopicNames)(identity)
+ }
+ val authorizedForDescribeConfigs = authHelper.filterByAuthorized(
+ request.context,
+ DESCRIBE_CONFIGS,
+ TOPIC,
+ allowedTopicNames,
+ logIfDenied = false
+ )(identity).map(name => name -> results.find(name)).toMap
results.forEach { topic =>
if (results.findAll(topic.name).size > 1) {
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 84ea10da57f..76a874b2197 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -161,8 +161,8 @@ class KafkaRaftServer(
}
object KafkaRaftServer {
- val MetadataTopic = Topic.METADATA_TOPIC_NAME
- val MetadataPartition = Topic.METADATA_TOPIC_PARTITION
+ val MetadataTopic = Topic.CLUSTER_METADATA_TOPIC_NAME
+ val MetadataPartition = Topic.CLUSTER_METADATA_TOPIC_PARTITION
val MetadataTopicId = Uuid.METADATA_TOPIC_ID
sealed trait ProcessRole
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index ecee2cd19c4..627a939ddcf 100644
---
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -163,9 +163,9 @@ abstract class AbstractCreateTopicsRequestTest extends
BaseRequestTest {
if (actual == null) {
throw new RuntimeException(s"No response data found for topic
$topicName")
}
- assertEquals(expected.error.code(), actual.errorCode(), "The response
error should match")
+ assertEquals(expected.error.code(), actual.errorCode(), "The response
error code should match")
if (checkErrorMessage) {
- assertEquals(expected.message, actual.errorMessage())
+ assertEquals(expected.message, actual.errorMessage(), "The response
error message should match")
}
// If no error validate topic exists
if (expectedError.isSuccess && !request.data.validateOnly) {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 05bd13d795d..969d57c0dd2 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -554,7 +554,8 @@ class ControllerApisTest {
setTopicId(new Uuid(0L, 2L)).
setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()),
new CreatableTopicResult().setName("quux").
- setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+ setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
+ setErrorMessage("Authorization failed."))
assertEquals(expectedResponse,
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
false,
_ => Set("baz", "indescribable"),
diff --git
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 57834234cc1..a193db284c4 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -19,6 +19,7 @@ package kafka.server
import kafka.utils._
import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.protocol.ApiKeys
@@ -27,7 +28,6 @@ import org.apache.kafka.common.requests.CreateTopicsRequest
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
-
import scala.jdk.CollectionConverters._
class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@@ -197,4 +197,13 @@ class CreateTopicsRequestTest extends
AbstractCreateTopicsRequestTest {
assertEquals(Uuid.ZERO_UUID, topicResponse.topicId())
}
}
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateClusterMetadataTopic(quorum: String): Unit = {
+ validateErrorCreateTopicsRequests(
+ topicsReq(Seq(topicReq(Topic.CLUSTER_METADATA_TOPIC_NAME))),
+ Map(Topic.CLUSTER_METADATA_TOPIC_NAME ->
error(Errors.TOPIC_AUTHORIZATION_FAILED, Some("Authorization failed.")))
+ )
+ }
}