This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new ebcefed404d KAFKA-18407: Remove ZkAdminManager, DelayedCreatePartitions, CreatePartitionsMetadata, ZkConfigRepository, DelayedDeleteTopics (#18574) ebcefed404d is described below commit ebcefed404d67e4474031f0d8e3071f0d81cfcc6 Author: Ken Huang <s7133...@gmail.com> AuthorDate: Fri Jan 17 02:32:22 2025 +0800 KAFKA-18407: Remove ZkAdminManager, DelayedCreatePartitions, CreatePartitionsMetadata, ZkConfigRepository, DelayedDeleteTopics (#18574) Reviewers: Mickael Maison <mickael.mai...@gmail.com> --- .../TopicAlreadyMarkedForDeletionException.scala | 21 - .../kafka/server/DelayedCreatePartitions.scala | 104 -- .../scala/kafka/server/DelayedDeleteTopics.scala | 85 -- .../main/scala/kafka/server/ZkAdminManager.scala | 1046 -------------------- .../kafka/server/metadata/ZkConfigRepository.scala | 51 - 5 files changed, 1307 deletions(-) diff --git a/core/src/main/scala/kafka/common/TopicAlreadyMarkedForDeletionException.scala b/core/src/main/scala/kafka/common/TopicAlreadyMarkedForDeletionException.scala deleted file mode 100644 index c83cea96b5d..00000000000 --- a/core/src/main/scala/kafka/common/TopicAlreadyMarkedForDeletionException.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -class TopicAlreadyMarkedForDeletionException(message: String) extends RuntimeException(message) { -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala deleted file mode 100644 index 5f204a24ff1..00000000000 --- a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.utils.Logging -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.ApiError -import org.apache.kafka.metadata.LeaderAndIsr -import org.apache.kafka.server.purgatory.DelayedOperation - -import scala.collection._ - -/** - * The create metadata maintained by the delayed create topic or create partitions operations. - */ -case class CreatePartitionsMetadata(topic: String, partitions: Set[Int], error: ApiError) - -object CreatePartitionsMetadata { - def apply(topic: String, partitions: Set[Int]): CreatePartitionsMetadata = { - CreatePartitionsMetadata(topic, partitions, ApiError.NONE) - } - - def apply(topic: String, error: Errors): CreatePartitionsMetadata = { - CreatePartitionsMetadata(topic, Set.empty, new ApiError(error, null)) - } - - def apply(topic: String, throwable: Throwable): CreatePartitionsMetadata = { - CreatePartitionsMetadata(topic, Set.empty, ApiError.fromThrowable(throwable)) - } -} - -/** - * A delayed create topic or create partitions operation that is stored in the topic purgatory. - */ -class DelayedCreatePartitions(delayMs: Long, - createMetadata: Seq[CreatePartitionsMetadata], - adminManager: ZkAdminManager, - responseCallback: Map[String, ApiError] => Unit) - extends DelayedOperation(delayMs) with Logging { - - /** - * The operation can be completed if all of the topics that do not have an error exist and every partition has a - * leader in the controller. - * See KafkaController.onNewTopicCreation - */ - override def tryComplete() : Boolean = { - trace(s"Trying to complete operation for $createMetadata") - - val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess).foldLeft(0) { case (topicCounter, metadata) => - topicCounter + missingLeaderCount(metadata.topic, metadata.partitions) - } - - if (leaderlessPartitionCount == 0) { - trace("All partitions have a leader, completing the delayed operation") - forceComplete() - } else { - trace(s"$leaderlessPartitionCount partitions do not have a leader, not completing the delayed operation") - false - } - } - - /** - * Check for partitions that are still missing a leader, update their error code and call the responseCallback - */ - override def onComplete(): Unit = { - trace(s"Completing operation for $createMetadata") - val results = createMetadata.map { metadata => - // ignore topics that already have errors - if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.partitions) > 0) - (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) - else - (metadata.topic, metadata.error) - }.toMap - responseCallback(results) - } - - override def onExpiration(): Unit = {} - - private def missingLeaderCount(topic: String, partitions: Set[Int]): Int = { - partitions.foldLeft(0) { case (counter, partition) => - if (isMissingLeader(topic, partition)) counter + 1 else counter - } - } - - private def isMissingLeader(topic: String, partition: Int): Boolean = { - val partitionInfo = adminManager.metadataCache.getPartitionInfo(topic, partition) - partitionInfo.forall(_.leader == LeaderAndIsr.NO_LEADER) - } -} diff --git a/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala b/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala deleted file mode 100644 index 4ec4698aecb..00000000000 --- a/core/src/main/scala/kafka/server/DelayedDeleteTopics.scala +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.utils.Logging -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.server.purgatory.DelayedOperation - -import scala.collection._ - -/** - * The delete metadata maintained by the delayed delete operation - */ -case class DeleteTopicMetadata(topic: String, error: Errors) - -object DeleteTopicMetadata { - def apply(topic: String, throwable: Throwable): DeleteTopicMetadata = { - DeleteTopicMetadata(topic, Errors.forException(throwable)) - } -} - -/** - * A delayed delete topics operation that can be created by the admin manager and watched - * in the topic purgatory - */ -class DelayedDeleteTopics(delayMs: Long, - deleteMetadata: Seq[DeleteTopicMetadata], - adminManager: ZkAdminManager, - responseCallback: Map[String, Errors] => Unit) - extends DelayedOperation(delayMs) with Logging { - - /** - * The operation can be completed if all of the topics not in error have been removed - */ - override def tryComplete() : Boolean = { - trace(s"Trying to complete operation for $deleteMetadata") - - // Ignore topics that already have errors - val existingTopics = deleteMetadata.count { metadata => metadata.error == Errors.NONE && topicExists(metadata.topic) } - - if (existingTopics == 0) { - trace("All topics have been deleted or have errors, completing the delayed operation") - forceComplete() - } else { - trace(s"$existingTopics topics still exist, not completing the delayed operation") - false - } - } - - /** - * Check for partitions that still exist, update their error code and call the responseCallback - */ - override def onComplete(): Unit = { - trace(s"Completing operation for $deleteMetadata") - val results = deleteMetadata.map { metadata => - // ignore topics that already have errors - if (metadata.error == Errors.NONE && topicExists(metadata.topic)) - (metadata.topic, Errors.REQUEST_TIMED_OUT) - else - (metadata.topic, metadata.error) - }.toMap - responseCallback(results) - } - - override def onExpiration(): Unit = { } - - private def topicExists(topic: String): Boolean = { - adminManager.metadataCache.contains(topic) - } -} diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala deleted file mode 100644 index 07bb34c770b..00000000000 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ /dev/null @@ -1,1046 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import java.util -import java.util.Properties -import kafka.common.TopicAlreadyMarkedForDeletionException -import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, toLoggableProps} -import kafka.server.metadata.ZkConfigRepository -import kafka.utils._ -import kafka.zk.{AdminZkClient, KafkaZkClient} -import org.apache.kafka.admin.AdminUtils -import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism} -import org.apache.kafka.common.Uuid -import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource} -import org.apache.kafka.common.errors.ThrottlingQuotaExceededException -import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException} -import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult -import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic -import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic -import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicConfigs, CreatableTopicResult} -import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, AlterUserScramCredentialsResponseData, DescribeUserScramCredentialsResponseData} -import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.security.scram.internals.{ScramMechanism => InternalScramMechanism} -import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} -import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} -import org.apache.kafka.common.requests.CreateTopicsRequest._ -import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError} -import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter} -import org.apache.kafka.common.utils.{Sanitizer, Utils} -import org.apache.kafka.server.common.AdminOperationException -import org.apache.kafka.server.config.{ConfigType, QuotaConfig, ZooKeeperInternals} -import org.apache.kafka.server.config.ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG -import org.apache.kafka.server.config.ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG -import org.apache.kafka.server.purgatory.{DelayedOperation, DelayedOperationPurgatory} -import org.apache.kafka.storage.internals.log.LogConfig - -import scala.collection.{Map, mutable, _} -import scala.jdk.CollectionConverters._ - -object ZkAdminManager { - def clientQuotaPropsToDoubleMap(props: Map[String, String]): Map[String, Double] = { - props.map { case (key, value) => - val doubleValue = try value.toDouble catch { - case _: NumberFormatException => - throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value") - } - key -> doubleValue - } - } -} - - -class ZkAdminManager(val config: KafkaConfig, - val metrics: Metrics, - val metadataCache: MetadataCache, - val zkClient: KafkaZkClient) extends Logging { - - this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: " - - private val topicPurgatory = new DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId) - private val adminZkClient = new AdminZkClient(zkClient, Some(config)) - private val configHelper = new ConfigHelper(metadataCache, config, new ZkConfigRepository(adminZkClient)) - - private val createTopicPolicy = - Option(config.getConfiguredInstance(CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, classOf[CreateTopicPolicy])) - - private val alterConfigPolicy = - Option(config.getConfiguredInstance(ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, classOf[AlterConfigPolicy])) - - def hasDelayedTopicOperations: Boolean = topicPurgatory.numDelayed != 0 - - private val defaultNumPartitions = config.numPartitions.intValue() - private val defaultReplicationFactor = config.defaultReplicationFactor.shortValue() - - /** - * Try to complete delayed topic operations with the request key - */ - def tryCompleteDelayedTopicOperations(topic: String): Unit = { - val key = TopicKey(topic) - val completed = topicPurgatory.checkAndComplete(key) - debug(s"Request key ${key.keyLabel} unblocked $completed topic requests.") - } - - private def validateTopicCreatePolicy(topic: CreatableTopic, - resolvedNumPartitions: Int, - resolvedReplicationFactor: Short, - assignments: Map[Int, Seq[Int]]): Unit = { - createTopicPolicy.foreach { policy => - // Use `null` for unset fields in the public API - val numPartitions: java.lang.Integer = - if (topic.assignments().isEmpty) resolvedNumPartitions else null - val replicationFactor: java.lang.Short = - if (topic.assignments().isEmpty) resolvedReplicationFactor else null - val javaAssignments = if (topic.assignments().isEmpty) { - null - } else { - assignments.map { case (k, v) => - (k: java.lang.Integer) -> v.map(i => i: java.lang.Integer).asJava - }.asJava - } - val javaConfigs = new java.util.HashMap[String, String] - topic.configs.forEach(config => javaConfigs.put(config.name, config.value)) - policy.validate(new RequestMetadata(topic.name, numPartitions, replicationFactor, - javaAssignments, javaConfigs)) - } - } - - private def maybePopulateMetadataAndConfigs(metadataAndConfigs: Map[String, CreatableTopicResult], - topicName: String, - configs: Properties, - assignments: Map[Int, Seq[Int]]): Unit = { - metadataAndConfigs.get(topicName).foreach { result => - val logConfig = LogConfig.fromProps(config.extractLogConfigMap, configs) - val createEntry = configHelper.createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _) - val topicConfigs = configHelper.allConfigs(logConfig).map { case (k, v) => - val entry = createEntry(k, v) - new CreatableTopicConfigs() - .setName(k) - .setValue(entry.value) - .setIsSensitive(entry.isSensitive) - .setReadOnly(entry.readOnly) - .setConfigSource(entry.configSource) - }.toList.asJava - result.setConfigs(topicConfigs) - result.setNumPartitions(assignments.size) - result.setReplicationFactor(assignments(0).size.toShort) - } - } - - private def populateIds(metadataAndConfigs: Map[String, CreatableTopicResult], - topicName: String) : Unit = { - metadataAndConfigs.get(topicName).foreach { result => - result.setTopicId(zkClient.getTopicIdsForTopics(Predef.Set(result.name())).getOrElse(result.name(), Uuid.ZERO_UUID)) - } - } - - /** - * Create topics and wait until the topics have been completely created. - * The callback function will be triggered either when timeout, error or the topics are created. - */ - def createTopics(timeout: Int, - validateOnly: Boolean, - toCreate: Map[String, CreatableTopic], - includeConfigsAndMetadata: Map[String, CreatableTopicResult], - controllerMutationQuota: ControllerMutationQuota, - responseCallback: Map[String, ApiError] => Unit): Unit = { - - // 1. map over topics creating assignment and calling zookeeper - val brokers = metadataCache.getAliveBrokers() - val metadata = toCreate.values.map(topic => - try { - if (metadataCache.contains(topic.name)) - throw new TopicExistsException(s"Topic '${topic.name}' already exists.") - - val nullConfigs = topic.configs.asScala.filter(_.value == null).map(_.name) - if (nullConfigs.nonEmpty) - throw new InvalidConfigurationException(s"Null value not supported for topic configs: ${nullConfigs.mkString(",")}") - - if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR) - && !topic.assignments().isEmpty) { - throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " + - "Both cannot be used at the same time.") - } - - val resolvedNumPartitions = if (topic.numPartitions == NO_NUM_PARTITIONS) - defaultNumPartitions else topic.numPartitions - val resolvedReplicationFactor = if (topic.replicationFactor == NO_REPLICATION_FACTOR) - defaultReplicationFactor else topic.replicationFactor - - val assignments = if (topic.assignments.isEmpty) { - CoreUtils.replicaToBrokerAssignmentAsScala(AdminUtils.assignReplicasToBrokers( - brokers.asJavaCollection, resolvedNumPartitions, resolvedReplicationFactor)) - } else { - val assignments = new mutable.HashMap[Int, Seq[Int]] - // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case, - // this follows the existing logic in TopicCommand - topic.assignments.forEach { assignment => - assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int) - } - assignments - } - trace(s"Assignments for topic $topic are $assignments ") - - val configs = new Properties() - topic.configs.forEach(entry => configs.setProperty(entry.name, entry.value)) - adminZkClient.validateTopicCreate(topic.name, assignments, configs) - validateTopicCreatePolicy(topic, resolvedNumPartitions, resolvedReplicationFactor, assignments) - - // For responses with DescribeConfigs permission, populate metadata and configs. It is - // safe to populate it before creating the topic because the values are unset if the - // creation fails. - maybePopulateMetadataAndConfigs(includeConfigsAndMetadata, topic.name, configs, assignments) - - if (validateOnly) { - CreatePartitionsMetadata(topic.name, assignments.keySet) - } else { - controllerMutationQuota.record(assignments.size) - adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false) - populateIds(includeConfigsAndMetadata, topic.name) - CreatePartitionsMetadata(topic.name, assignments.keySet) - } - } catch { - // Log client errors at a lower level than unexpected exceptions - case e: TopicExistsException => - debug(s"Topic creation failed since topic '${topic.name}' already exists.", e) - CreatePartitionsMetadata(topic.name, e) - case e: ThrottlingQuotaExceededException => - debug(s"Topic creation not allowed because quota is violated. Delay time: ${e.throttleTimeMs}") - CreatePartitionsMetadata(topic.name, e) - case e: ApiException => - info(s"Error processing create topic request $topic", e) - CreatePartitionsMetadata(topic.name, e) - case e: ConfigException => - info(s"Error processing create topic request $topic", e) - CreatePartitionsMetadata(topic.name, new InvalidConfigurationException(e.getMessage, e.getCause)) - case e: Throwable => - error(s"Error processing create topic request $topic", e) - CreatePartitionsMetadata(topic.name, e) - }).toBuffer - - // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately - if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) { - val results = metadata.map { createTopicMetadata => - // ignore topics that already have errors - if (createTopicMetadata.error.isSuccess && !validateOnly) { - (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) - } else { - (createTopicMetadata.topic, createTopicMetadata.error) - } - }.toMap - responseCallback(results) - } else { - // 3. else pass the assignments and errors to the delayed operation and set the keys - val delayedCreate = new DelayedCreatePartitions(timeout, metadata, this, - responseCallback) - val delayedCreateKeys = toCreate.values.map(topic => TopicKey(topic.name)).toList - // try to complete the request immediately, otherwise put it into the purgatory - topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys.asJava) - } - } - - /** - * Delete topics and wait until the topics have been completely deleted. - * The callback function will be triggered either when timeout, error or the topics are deleted. - */ - def deleteTopics(timeout: Int, - topics: Set[String], - controllerMutationQuota: ControllerMutationQuota, - responseCallback: Map[String, Errors] => Unit): Unit = { - // 1. map over topics calling the asynchronous delete - val metadata = topics.map { topic => - try { - controllerMutationQuota.record(metadataCache.numPartitions(topic).getOrElse(0).toDouble) - adminZkClient.deleteTopic(topic) - DeleteTopicMetadata(topic, Errors.NONE) - } catch { - case _: TopicAlreadyMarkedForDeletionException => - // swallow the exception, and still track deletion allowing multiple calls to wait for deletion - DeleteTopicMetadata(topic, Errors.NONE) - case e: ThrottlingQuotaExceededException => - debug(s"Topic deletion not allowed because quota is violated. Delay time: ${e.throttleTimeMs}") - DeleteTopicMetadata(topic, e) - case e: Throwable => - error(s"Error processing delete topic request for topic $topic", e) - DeleteTopicMetadata(topic, e) - } - } - - // 2. if timeout <= 0 or no topics can proceed return immediately - if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) { - val results = metadata.map { deleteTopicMetadata => - // ignore topics that already have errors - if (deleteTopicMetadata.error == Errors.NONE) { - (deleteTopicMetadata.topic, Errors.REQUEST_TIMED_OUT) - } else { - (deleteTopicMetadata.topic, deleteTopicMetadata.error) - } - }.toMap - responseCallback(results) - } else { - // 3. else pass the topics and errors to the delayed operation and set the keys - val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback) - val delayedDeleteKeys = topics.map(TopicKey).toList - // try to complete the request immediately, otherwise put it into the purgatory - topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys.asJava) - } - } - - def createPartitions(timeoutMs: Int, - newPartitions: Seq[CreatePartitionsTopic], - validateOnly: Boolean, - controllerMutationQuota: ControllerMutationQuota, - callback: Map[String, ApiError] => Unit): Unit = { - val allBrokers = adminZkClient.getBrokerMetadatas() - val allBrokerIds = allBrokers.map(_.id) - - // 1. map over topics creating assignment and calling AdminUtils - val metadata = newPartitions.map { newPartition => - val topic = newPartition.name - - try { - val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic)).map { - case (topicPartition, assignment) => - if (assignment.isBeingReassigned) { - // We prevent adding partitions while topic reassignment is in progress, to protect from a race condition - // between the controller thread processing reassignment update and createPartitions(this) request. - throw new ReassignmentInProgressException(s"A partition reassignment is in progress for the topic '$topic'.") - } - topicPartition.partition -> assignment - } - if (existingAssignment.isEmpty) - throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.") - - val oldNumPartitions = existingAssignment.size - val newNumPartitions = newPartition.count - val numPartitionsIncrement = newNumPartitions - oldNumPartitions - if (numPartitionsIncrement < 0) { - throw new InvalidPartitionsException( - s"Topic currently has $oldNumPartitions partitions, which is higher than the requested $newNumPartitions.") - } else if (numPartitionsIncrement == 0) { - throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.") - } - - val newPartitionsAssignment = Option(newPartition.assignments).map { assignmentMap => - val assignments = assignmentMap.asScala.map { - createPartitionAssignment => createPartitionAssignment.brokerIds.asScala.map(_.toInt) - } - val unknownBrokers = assignments.flatten.toSet -- allBrokerIds - if (unknownBrokers.nonEmpty) - throw new InvalidReplicaAssignmentException( - s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.") - - if (assignments.size != numPartitionsIncrement) - throw new InvalidReplicaAssignmentException( - s"Increasing the number of partitions by $numPartitionsIncrement " + - s"but ${assignments.size} assignments provided.") - - assignments.zipWithIndex.map { case (replicas, index) => - existingAssignment.size + index -> replicas - }.toMap - } - - val assignmentForNewPartitions = adminZkClient.createNewPartitionsAssignment( - topic, existingAssignment, allBrokers, newPartition.count, newPartitionsAssignment) - - if (validateOnly) { - CreatePartitionsMetadata(topic, (existingAssignment ++ assignmentForNewPartitions).keySet) - } else { - controllerMutationQuota.record(numPartitionsIncrement) - val updatedReplicaAssignment = adminZkClient.createPartitionsWithAssignment( - topic, existingAssignment, assignmentForNewPartitions) - CreatePartitionsMetadata(topic, updatedReplicaAssignment.keySet) - } - } catch { - case e: AdminOperationException => - CreatePartitionsMetadata(topic, e) - case e: ThrottlingQuotaExceededException => - debug(s"Partition(s) creation not allowed because quota is violated. Delay time: ${e.throttleTimeMs}") - CreatePartitionsMetadata(topic, e) - case e: ApiException => - CreatePartitionsMetadata(topic, e) - } - } - - // 2. if timeout <= 0, validateOnly or no topics can proceed return immediately - if (timeoutMs <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) { - val results = metadata.map { createPartitionMetadata => - // ignore topics that already have errors - if (createPartitionMetadata.error.isSuccess && !validateOnly) { - (createPartitionMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) - } else { - (createPartitionMetadata.topic, createPartitionMetadata.error) - } - }.toMap - callback(results) - } else { - // 3. else pass the assignments and errors to the delayed operation and set the keys - val delayedCreate = new DelayedCreatePartitions(timeoutMs, metadata, this, callback) - val delayedCreateKeys = newPartitions.map(createPartitionTopic => TopicKey(createPartitionTopic.name)).toList - // try to complete the request immediately, otherwise put it into the purgatory - topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys.asJava) - } - } - - def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[ConfigResource, ApiError] = { - configs.map { case (resource, config) => - - try { - val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap - - val configProps = new Properties - config.entries.asScala.filter(_.value != null).foreach { configEntry => - configProps.setProperty(configEntry.name, configEntry.value) - } - - resource.`type` match { - case ConfigResource.Type.TOPIC => alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap) - case ConfigResource.Type.BROKER => alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) - case resourceType => - throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType") - } - } catch { - case e @ (_: ConfigException | _: IllegalArgumentException) => - val message = s"Invalid config value for resource $resource: ${e.getMessage}" - info(message) - resource -> ApiError.fromThrowable(new InvalidConfigurationException(message, e)) - case e: Throwable => - val configProps = new Properties - config.entries.asScala.filter(_.value != null).foreach { configEntry => - configProps.setProperty(configEntry.name, configEntry.value) - } - // Log client errors at a lower level than unexpected exceptions - val message = s"Error processing alter configs request for resource $resource, config ${toLoggableProps(resource, configProps).mkString(",")}" - if (e.isInstanceOf[ApiException]) - info(message, e) - else - error(message, e) - resource -> ApiError.fromThrowable(e) - } - }.toMap - } - - private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean, - configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { - val topic = resource.name - if (topic.isEmpty) { - throw new InvalidRequestException("Default topic resources are not allowed.") - } - - if (!metadataCache.contains(topic)) - throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.") - - adminZkClient.validateTopicConfig(topic, configProps) - validateConfigPolicy(resource, configEntriesMap) - if (!validateOnly) { - info(s"Updating topic $topic with new configuration : ${toLoggableProps(resource, configProps).mkString(",")}") - adminZkClient.changeTopicConfig(topic, configProps) - } - - resource -> ApiError.NONE - } - - private def alterBrokerConfigs(resource: ConfigResource, validateOnly: Boolean, - configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { - val brokerId = getBrokerId(resource) - val perBrokerConfig = brokerId.nonEmpty - this.config.dynamicConfig.validate(configProps, perBrokerConfig) - validateConfigPolicy(resource, configEntriesMap) - if (!validateOnly) { - if (perBrokerConfig) - this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps) - - if (perBrokerConfig) - info(s"Updating broker ${brokerId.get} with new configuration : ${toLoggableProps(resource, configProps).mkString(",")}") - else - info(s"Updating brokers with new configuration : ${toLoggableProps(resource, configProps).mkString(",")}") - - adminZkClient.changeBrokerConfig(brokerId, - this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig)) - } - - resource -> ApiError.NONE - } - - private def getBrokerId(resource: ConfigResource) = { - if (resource.name == null || resource.name.isEmpty) - None - else { - val id = resourceNameToBrokerId(resource.name) - if (id != this.config.brokerId) - throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received ${resource.name}") - Some(id) - } - } - - private def validateConfigPolicy(resource: ConfigResource, configEntriesMap: Map[String, String]): Unit = { - alterConfigPolicy match { - case Some(policy) => - policy.validate(new AlterConfigPolicy.RequestMetadata( - new ConfigResource(resource.`type`(), resource.name), configEntriesMap.asJava)) - case None => - } - } - - def incrementalAlterConfigs(configs: Map[ConfigResource, Seq[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = { - configs.map { case (resource, alterConfigOps) => - try { - val configEntriesMap = alterConfigOps.map(entry => (entry.configEntry.name, entry.configEntry.value)).toMap - - resource.`type` match { - case ConfigResource.Type.TOPIC => - if (resource.name.isEmpty) { - throw new InvalidRequestException("Default topic resources are not allowed.") - } - val configProps = adminZkClient.fetchEntityConfig(ConfigType.TOPIC, resource.name) - prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys.asScala) - alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap) - - case ConfigResource.Type.BROKER => - val brokerId = getBrokerId(resource) - val perBrokerConfig = brokerId.nonEmpty - - val persistentProps = if (perBrokerConfig) adminZkClient.fetchEntityConfig(ConfigType.BROKER, brokerId.get.toString) - else adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING) - - val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) - prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys) - alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) - - case resourceType => - throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType") - } - } catch { - case e @ (_: ConfigException | _: IllegalArgumentException) => - val message = s"Invalid config value for resource $resource: ${e.getMessage}" - info(message) - resource -> ApiError.fromThrowable(new InvalidConfigurationException(message, e)) - case e: Throwable => - // Log client errors at a lower level than unexpected exceptions - val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps" - if (e.isInstanceOf[ApiException]) - info(message, e) - else - error(message, e) - resource -> ApiError.fromThrowable(e) - } - }.toMap - } - - def shutdown(): Unit = { - topicPurgatory.shutdown() - createTopicPolicy.foreach(Utils.closeQuietly(_, "create topic policy")) - alterConfigPolicy.foreach(Utils.closeQuietly(_, "alter config policy")) - } - - private def resourceNameToBrokerId(resourceName: String): Int = { - try resourceName.toInt catch { - case _: NumberFormatException => - throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName") - } - } - - private def sanitizeEntityName(entityName: String): String = - Option(entityName) match { - case None => ZooKeeperInternals.DEFAULT_STRING - case Some(name) => Sanitizer.sanitize(name) - } - - private def desanitizeEntityName(sanitizedEntityName: String): String = - sanitizedEntityName match { - case ZooKeeperInternals.DEFAULT_STRING => null - case name => Sanitizer.desanitize(name) - } - - private def parseAndSanitizeQuotaEntity(entity: ClientQuotaEntity): (Option[String], Option[String], Option[String]) = { - if (entity.entries.isEmpty) - throw new InvalidRequestException("Invalid empty client quota entity") - - var user: Option[String] = None - var clientId: Option[String] = None - var ip: Option[String] = None - entity.entries.forEach { (entityType, entityName) => - val sanitizedEntityName = Some(sanitizeEntityName(entityName)) - entityType match { - case ClientQuotaEntity.USER => user = sanitizedEntityName - case ClientQuotaEntity.CLIENT_ID => clientId = sanitizedEntityName - case ClientQuotaEntity.IP => ip = sanitizedEntityName - case _ => throw new InvalidRequestException(s"Unhandled client quota entity type: $entityType") - } - if (entityName != null && entityName.isEmpty) - throw new InvalidRequestException(s"Empty $entityType not supported") - } - (user, clientId, ip) - } - - private def userClientIdToEntity(user: Option[String], clientId: Option[String]): ClientQuotaEntity = { - new ClientQuotaEntity((user.map(u => ClientQuotaEntity.USER -> u) ++ clientId.map(c => ClientQuotaEntity.CLIENT_ID -> c)).toMap.asJava) - } - - def describeClientQuotas(filter: ClientQuotaFilter): Map[ClientQuotaEntity, Map[String, Double]] = { - var userComponent: Option[ClientQuotaFilterComponent] = None - var clientIdComponent: Option[ClientQuotaFilterComponent] = None - var ipComponent: Option[ClientQuotaFilterComponent] = None - filter.components.forEach { component => - component.entityType match { - case ClientQuotaEntity.USER => - if (userComponent.isDefined) - throw new InvalidRequestException(s"Duplicate user filter component entity type") - userComponent = Some(component) - case ClientQuotaEntity.CLIENT_ID => - if (clientIdComponent.isDefined) - throw new InvalidRequestException(s"Duplicate client filter component entity type") - clientIdComponent = Some(component) - case ClientQuotaEntity.IP => - if (ipComponent.isDefined) - throw new InvalidRequestException(s"Duplicate ip filter component entity type") - ipComponent = Some(component) - case "" => - throw new InvalidRequestException(s"Unexpected empty filter component entity type") - case et => - // Supplying other entity types is not yet supported. - throw new UnsupportedVersionException(s"Custom entity type '$et' not supported") - } - } - if ((userComponent.isDefined || clientIdComponent.isDefined) && ipComponent.isDefined) - throw new InvalidRequestException(s"Invalid entity filter component combination, IP filter component should not be used with " + - s"user or clientId filter component.") - - val userClientQuotas = if (ipComponent.isEmpty) - handleDescribeClientQuotas(userComponent, clientIdComponent, filter.strict) - else - Map.empty - - val ipQuotas = if (userComponent.isEmpty && clientIdComponent.isEmpty) - handleDescribeIpQuotas(ipComponent, filter.strict) - else - Map.empty - - (userClientQuotas ++ ipQuotas).toMap - } - - private def wantExact(component: Option[ClientQuotaFilterComponent]): Boolean = component.exists(_.`match` != null) - - private def toOption(opt: java.util.Optional[String]): Option[String] = { - if (opt == null) - None - else if (opt.isPresent) - Some(opt.get) - else - Some(null) - } - - private def sanitized(name: Option[String]): String = name.map(n => sanitizeEntityName(n)).getOrElse("") - - private def handleDescribeClientQuotas(userComponent: Option[ClientQuotaFilterComponent], - clientIdComponent: Option[ClientQuotaFilterComponent], - strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = { - - val user = userComponent.flatMap(c => toOption(c.`match`)) - val clientId = clientIdComponent.flatMap(c => toOption(c.`match`)) - - val sanitizedUser = sanitized(user) - val sanitizedClientId = sanitized(clientId) - - val exactUser = wantExact(userComponent) - val exactClientId = wantExact(clientIdComponent) - - def wantExcluded(component: Option[ClientQuotaFilterComponent]): Boolean = strict && component.isEmpty - val excludeUser = wantExcluded(userComponent) - val excludeClientId = wantExcluded(clientIdComponent) - - val userEntries = if (exactUser && excludeClientId) - Map((Some(user.get), None) -> adminZkClient.fetchEntityConfig(ConfigType.USER, sanitizedUser)) - else if (!excludeUser && !exactClientId) - adminZkClient.fetchAllEntityConfigs(ConfigType.USER).map { case (name, props) => - (Some(desanitizeEntityName(name)), None) -> props - } - else - Map.empty - - val clientIdEntries = if (excludeUser && exactClientId) - Map((None, Some(clientId.get)) -> adminZkClient.fetchEntityConfig(ConfigType.CLIENT, sanitizedClientId)) - else if (!exactUser && !excludeClientId) - adminZkClient.fetchAllEntityConfigs(ConfigType.CLIENT).map { case (name, props) => - (None, Some(desanitizeEntityName(name))) -> props - } - else - Map.empty - - val bothEntries = if (exactUser && exactClientId) - Map((Some(user.get), Some(clientId.get)) -> - adminZkClient.fetchEntityConfig(ConfigType.USER, s"${sanitizedUser}/clients/${sanitizedClientId}")) - else if (!excludeUser && !excludeClientId) - adminZkClient.fetchAllChildEntityConfigs(ConfigType.USER, ConfigType.CLIENT).map { case (name, props) => - val components = name.split("/") - if (components.size != 3 || components(1) != "clients") - throw new IllegalArgumentException(s"Unexpected config path: $name") - (Some(desanitizeEntityName(components(0))), Some(desanitizeEntityName(components(2)))) -> props - } - else - Map.empty - - def matches(nameComponent: Option[ClientQuotaFilterComponent], name: Option[String]): Boolean = nameComponent match { - case Some(component) => - toOption(component.`match`) match { - case Some(n) => name.contains(n) - case None => name.isDefined - } - case None => - name.isEmpty || !strict - } - - (userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) => - val quotaProps = p.asScala.filter { case (key, _) => QuotaConfig.isClientOrUserQuotaConfig(key) } - if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) - Some(userClientIdToEntity(u, c) -> ZkAdminManager.clientQuotaPropsToDoubleMap(quotaProps)) - else - None - }.toMap - } - - private def handleDescribeIpQuotas(ipComponent: Option[ClientQuotaFilterComponent], strict: Boolean): Map[ClientQuotaEntity, Map[String, Double]] = { - val ip = ipComponent.flatMap(c => toOption(c.`match`)) - val exactIp = wantExact(ipComponent) - val allIps = ipComponent.exists(_.`match` == null) || (ipComponent.isEmpty && !strict) - val ipEntries = if (exactIp) - Map(Some(ip.get) -> adminZkClient.fetchEntityConfig(ConfigType.IP, sanitized(ip))) - else if (allIps) - adminZkClient.fetchAllEntityConfigs(ConfigType.IP).map { case (name, props) => - Some(desanitizeEntityName(name)) -> props - } - else - Map.empty - - def ipToQuotaEntity(ip: Option[String]): ClientQuotaEntity = { - new ClientQuotaEntity(ip.map(ipName => ClientQuotaEntity.IP -> ipName).toMap.asJava) - } - - ipEntries.flatMap { case (ip, props) => - val ipQuotaProps = props.asScala.filter { case (key, _) => DynamicConfig.Ip.names.contains(key) } - if (ipQuotaProps.nonEmpty) - Some(ipToQuotaEntity(ip) -> ZkAdminManager.clientQuotaPropsToDoubleMap(ipQuotaProps)) - else - None - } - } - - def alterClientQuotas(entries: Seq[ClientQuotaAlteration], validateOnly: Boolean): Map[ClientQuotaEntity, ApiError] = { - def alterEntityQuotas(entity: ClientQuotaEntity, ops: Iterable[ClientQuotaAlteration.Op]): Unit = { - val (path, configType, configKeys, isUserClientId) = parseAndSanitizeQuotaEntity(entity) match { - case (Some(user), Some(clientId), None) => (user + "/clients/" + clientId, ConfigType.USER, DynamicConfig.User.configKeys, true) - case (Some(user), None, None) => (user, ConfigType.USER, DynamicConfig.User.configKeys, false) - case (None, Some(clientId), None) => (clientId, ConfigType.CLIENT, DynamicConfig.Client.configKeys, false) - case (None, None, Some(ip)) => - if (!DynamicConfig.Ip.isValidIpEntity(ip)) - throw new InvalidRequestException(s"$ip is not a valid IP or resolvable host.") - (ip, ConfigType.IP, DynamicConfig.Ip.configKeys, false) - case (_, _, Some(_)) => throw new InvalidRequestException(s"Invalid quota entity combination, " + - s"IP entity should not be used with user/client ID entity.") - case _ => throw new InvalidRequestException("Invalid client quota entity") - } - - val props = adminZkClient.fetchEntityConfig(configType, path) - ops.foreach { op => - op.value match { - case null => - props.remove(op.key) - case value => configKeys.get(op.key) match { - case null => - throw new InvalidRequestException(s"Invalid configuration key ${op.key}") - case key => key.`type` match { - case ConfigDef.Type.DOUBLE => - props.setProperty(op.key, value.toString) - case ConfigDef.Type.LONG | ConfigDef.Type.INT => - val epsilon = 1e-6 - val intValue = if (key.`type` == ConfigDef.Type.LONG) - (value + epsilon).toLong - else - (value + epsilon).toInt - if ((intValue.toDouble - value).abs > epsilon) - throw new InvalidRequestException(s"Configuration ${op.key} must be a ${key.`type`} value") - props.setProperty(op.key, intValue.toString) - case _ => - throw new IllegalStateException(s"Unexpected config type ${key.`type`}") - } - } - } - } - if (!validateOnly) - adminZkClient.changeConfigs(configType, path, props, isUserClientId) - } - entries.map { entry => - val apiError = try { - alterEntityQuotas(entry.entity, entry.ops.asScala) - ApiError.NONE - } catch { - case e: Throwable => - info(s"Error encountered while updating client quotas", e) - ApiError.fromThrowable(e) - } - entry.entity -> apiError - }.toMap - } - - private val usernameMustNotBeEmptyMsg = "Username must not be empty" - private val errorProcessingDescribe = "Error processing describe user SCRAM credential configs request" - private val attemptToDescribeUserThatDoesNotExist = "Attempt to describe a user credential that does not exist" - - def describeUserScramCredentials(users: Option[Seq[String]]): DescribeUserScramCredentialsResponseData = { - val describingAllUsers = users.isEmpty || users.get.isEmpty - val retval = new DescribeUserScramCredentialsResponseData() - val userResults = mutable.Map[String, DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult]() - - def addToResultsIfHasScramCredential(user: String, userConfig: Properties, explicitUser: Boolean = false): Unit = { - val result = new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult().setUser(user) - val configKeys = userConfig.stringPropertyNames - val hasScramCredential = ScramMechanism.values().toList.exists(key => key != ScramMechanism.UNKNOWN && configKeys.contains(key.mechanismName)) - if (hasScramCredential) { - val credentialInfos = new util.ArrayList[CredentialInfo] - try { - ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => - val propertyValue = userConfig.getProperty(mechanism.mechanismName) - if (propertyValue != null) { - val iterations = ScramCredentialUtils.credentialFromString(propertyValue).iterations - credentialInfos.add(new CredentialInfo().setMechanism(mechanism.`type`).setIterations(iterations)) - } - } - result.setCredentialInfos(credentialInfos) - } catch { - case e: Exception => { // should generally never happen, but just in case bad data gets in... - val apiError = apiErrorFrom(e, errorProcessingDescribe) - result.setErrorCode(apiError.error.code).setErrorMessage(apiError.error.message) - } - } - userResults += (user -> result) - } else if (explicitUser) { - // it is an error to request credentials for a user that has no credentials - result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code).setErrorMessage(s"$attemptToDescribeUserThatDoesNotExist: $user") - userResults += (user -> result) - } - } - - def collectRetrievedResults(): Unit = { - if (describingAllUsers) { - val usersSorted = SortedSet.empty[String] ++ userResults.keys - usersSorted.foreach { user => retval.results.add(userResults(user)) } - } else { - // be sure to only include a single copy of a result for any user requested multiple times - users.get.distinct.foreach { user => retval.results.add(userResults(user)) } - } - } - - try { - if (describingAllUsers) - adminZkClient.fetchAllEntityConfigs(ConfigType.USER).foreach { - case (user, properties) => addToResultsIfHasScramCredential(Sanitizer.desanitize(user), properties) } - else { - // describing specific users - val illegalUsers = users.get.filter(_.isEmpty).toSet - illegalUsers.foreach { user => - userResults += (user -> new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult() - .setUser(user) - .setErrorCode(Errors.RESOURCE_NOT_FOUND.code) - .setErrorMessage(usernameMustNotBeEmptyMsg)) } - val duplicatedUsers = users.get.groupBy(identity).filter( - userAndOccurrencesTuple => userAndOccurrencesTuple._2.length > 1).keys - duplicatedUsers.filterNot(illegalUsers.contains).foreach { user => - userResults += (user -> new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult() - .setUser(user) - .setErrorCode(Errors.DUPLICATE_RESOURCE.code) - .setErrorMessage(s"Cannot describe SCRAM credentials for the same user twice in a single request: $user")) } - val usersToSkip = illegalUsers ++ duplicatedUsers - users.get.filterNot(usersToSkip.contains).foreach { user => - try { - val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) - addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) - } catch { - case e: Exception => { - val apiError = apiErrorFrom(e, errorProcessingDescribe) - userResults += (user -> new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult() - .setUser(user) - .setErrorCode(apiError.error.code) - .setErrorMessage(apiError.error.message)) - } - } - } - } - collectRetrievedResults() - } catch { - case e: Exception => { - // this should generally only happen when we get a failure trying to retrieve all user configs from ZooKeeper - val apiError = apiErrorFrom(e, errorProcessingDescribe) - retval.setErrorCode(apiError.error.code).setErrorMessage(apiError.messageWithFallback()) - } - } - retval - } - - private def apiErrorFrom(e: Exception, message: String): ApiError = { - if (e.isInstanceOf[ApiException]) - info(message, e) - else - error(message, e) - ApiError.fromThrowable(e) - } - - private case class requestStatus(user: String, mechanism: Option[ScramMechanism], legalRequest: Boolean, iterations: Int) {} - - def alterUserScramCredentials(upsertions: Seq[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion], - deletions: Seq[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]): AlterUserScramCredentialsResponseData = { - - def scramMechanism(mechanism: Byte): ScramMechanism = { - ScramMechanism.fromType(mechanism) - } - - def mechanismName(mechanism: Byte): String = { - scramMechanism(mechanism).mechanismName - } - - val retval = new AlterUserScramCredentialsResponseData() - - // fail any user that is invalid due to an empty user name, an unknown SCRAM mechanism, or unacceptable number of iterations - val maxIterations = 16384 - val illegalUpsertions = upsertions.map(upsertion => - if (upsertion.name.isEmpty) - requestStatus(upsertion.name, None, legalRequest = false, upsertion.iterations) // no determined mechanism -- empty user is the cause of failure - else { - val publicScramMechanism = scramMechanism(upsertion.mechanism) - if (publicScramMechanism == ScramMechanism.UNKNOWN) { - requestStatus(upsertion.name, Some(publicScramMechanism), legalRequest = false, upsertion.iterations) // unknown mechanism is the cause of failure - } else { - if (upsertion.iterations < InternalScramMechanism.forMechanismName(publicScramMechanism.mechanismName).minIterations - || upsertion.iterations > maxIterations) { - requestStatus(upsertion.name, Some(publicScramMechanism), legalRequest = false, upsertion.iterations) // known mechanism, bad iterations is the cause of failure - } else { - requestStatus(upsertion.name, Some(publicScramMechanism), legalRequest = true, upsertion.iterations) // legal - } - } - }).filter { !_.legalRequest } - val illegalDeletions = deletions.map(deletion => - if (deletion.name.isEmpty) { - requestStatus(deletion.name, None, legalRequest = false, 0) // no determined mechanism -- empty user is the cause of failure - } else { - val publicScramMechanism = scramMechanism(deletion.mechanism) - requestStatus(deletion.name, Some(publicScramMechanism), publicScramMechanism != ScramMechanism.UNKNOWN, 0) - }).filter { !_.legalRequest } - // map user names to error messages - val unknownScramMechanismMsg = "Unknown SCRAM mechanism" - val tooFewIterationsMsg = "Too few iterations" - val tooManyIterationsMsg = "Too many iterations" - val illegalRequestsByUser = - illegalDeletions.map(requestStatus => - if (requestStatus.user.isEmpty) { - (requestStatus.user, usernameMustNotBeEmptyMsg) - } else { - (requestStatus.user, unknownScramMechanismMsg) - } - ).toMap ++ illegalUpsertions.map(requestStatus => - if (requestStatus.user.isEmpty) { - (requestStatus.user, usernameMustNotBeEmptyMsg) - } else if (requestStatus.mechanism.contains(ScramMechanism.UNKNOWN)) { - (requestStatus.user, unknownScramMechanismMsg) - } else { - (requestStatus.user, if (requestStatus.iterations > maxIterations) {tooManyIterationsMsg} else tooFewIterationsMsg) - } - ).toMap - - illegalRequestsByUser.foreachEntry { (user, errorMessage) => - retval.results.add(new AlterUserScramCredentialsResult().setUser(user) - .setErrorCode(if (errorMessage == unknownScramMechanismMsg) {Errors.UNSUPPORTED_SASL_MECHANISM.code} else {Errors.UNACCEPTABLE_CREDENTIAL.code}) - .setErrorMessage(errorMessage)) } - - val invalidUsers = (illegalUpsertions ++ illegalDeletions).map(_.user).toSet - val initiallyValidUserMechanismPairs = upsertions.filter(upsertion => !invalidUsers.contains(upsertion.name)).map(upsertion => (upsertion.name, upsertion.mechanism)) ++ - deletions.filter(deletion => !invalidUsers.contains(deletion.name)).map(deletion => (deletion.name, deletion.mechanism)) - - val usersWithDuplicateUserMechanismPairs = initiallyValidUserMechanismPairs.groupBy(identity).filter( - userMechanismPairAndOccurrencesTuple => userMechanismPairAndOccurrencesTuple._2.length > 1).keys.map(userMechanismPair => userMechanismPair._1).toSet - usersWithDuplicateUserMechanismPairs.foreach { user => - retval.results.add(new AlterUserScramCredentialsResult() - .setUser(user) - .setErrorCode(Errors.DUPLICATE_RESOURCE.code).setErrorMessage("A user credential cannot be altered twice in the same request")) } - - def potentiallyValidUserMechanismPairs = initiallyValidUserMechanismPairs.filter(pair => !usersWithDuplicateUserMechanismPairs.contains(pair._1)) - - val potentiallyValidUsers = potentiallyValidUserMechanismPairs.map(_._1).toSet - val configsByPotentiallyValidUser = potentiallyValidUsers.map(user => (user, adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)))).toMap - - // check for deletion of a credential that does not exist - val invalidDeletions = deletions.filter(deletion => potentiallyValidUsers.contains(deletion.name)).filter(deletion => - configsByPotentiallyValidUser(deletion.name).getProperty(mechanismName(deletion.mechanism)) == null) - val invalidUsersDueToInvalidDeletions = invalidDeletions.map(_.name).toSet - invalidUsersDueToInvalidDeletions.foreach { user => - retval.results.add(new AlterUserScramCredentialsResult() - .setUser(user) - .setErrorCode(Errors.RESOURCE_NOT_FOUND.code).setErrorMessage("Attempt to delete a user credential that does not exist")) } - - // now prepare the new set of property values for users that don't have any issues identified above, - // keeping track of ones that fail - val usersToTryToAlter = potentiallyValidUsers.diff(invalidUsersDueToInvalidDeletions) - val usersFailedToPrepareProperties = usersToTryToAlter.map(user => { - try { - // deletions: remove property keys - deletions.filter(deletion => usersToTryToAlter.contains(deletion.name)).foreach { deletion => - configsByPotentiallyValidUser(deletion.name).remove(mechanismName(deletion.mechanism)) } - // upsertions: put property key/value - upsertions.filter(upsertion => usersToTryToAlter.contains(upsertion.name)).foreach { upsertion => - val mechanism = InternalScramMechanism.forMechanismName(mechanismName(upsertion.mechanism)) - val credential = new ScramFormatter(mechanism) - .generateCredential(upsertion.salt, upsertion.saltedPassword, upsertion.iterations) - configsByPotentiallyValidUser(upsertion.name).put(mechanismName(upsertion.mechanism), ScramCredentialUtils.credentialToString(credential)) } - (user) // success, 1 element, won't be matched - } catch { - case e: Exception => - info(s"Error encountered while altering user SCRAM credentials", e) - (user, e) // fail, 2 elements, will be matched - } - }).collect { case (user: String, exception: Exception) => (user, exception) }.toMap - - // now persist the properties we have prepared, again keeping track of whatever fails - val usersFailedToPersist = usersToTryToAlter.filterNot(usersFailedToPrepareProperties.contains).map(user => { - try { - adminZkClient.changeConfigs(ConfigType.USER, Sanitizer.sanitize(user), configsByPotentiallyValidUser(user)) - (user) // success, 1 element, won't be matched - } catch { - case e: Exception => - info(s"Error encountered while altering user SCRAM credentials", e) - (user, e) // fail, 2 elements, will be matched - } - }).collect { case (user: String, exception: Exception) => (user, exception) }.toMap - - // report failures - usersFailedToPrepareProperties.++(usersFailedToPersist).foreachEntry { (user, exception) => - val error = Errors.forException(exception) - retval.results.add(new AlterUserScramCredentialsResult() - .setUser(user) - .setErrorCode(error.code) - .setErrorMessage(error.message)) } - - // report successes - usersToTryToAlter.filterNot(usersFailedToPrepareProperties.contains).filterNot(usersFailedToPersist.contains).foreach { user => - retval.results.add(new AlterUserScramCredentialsResult() - .setUser(user) - .setErrorCode(Errors.NONE.code)) } - - retval - } -} diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala deleted file mode 100644 index 74e73512961..00000000000 --- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server.metadata - -import java.util.Properties -import kafka.zk.{AdminZkClient, KafkaZkClient} -import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.config.ConfigResource.Type -import org.apache.kafka.common.errors.InvalidRequestException -import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals} - - -object ZkConfigRepository { - def apply(zkClient: KafkaZkClient): ZkConfigRepository = - new ZkConfigRepository(new AdminZkClient(zkClient)) -} - -class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository { - override def config(configResource: ConfigResource): Properties = { - val configTypeForZk = configResource.`type` match { - case Type.TOPIC => ConfigType.TOPIC - case Type.BROKER => ConfigType.BROKER - case Type.CLIENT_METRICS => throw new InvalidRequestException("Config type client-metrics is only supported on KRaft clusters") - case Type.GROUP => throw new InvalidRequestException("Config type groups is only supported on KRaft clusters") - case tpe => throw new IllegalArgumentException(s"Unsupported config type: $tpe") - } - // ZK stores cluster configs under "<default>". - val effectiveName = if (configResource.`type`.equals(Type.BROKER) && - configResource.name.isEmpty) { - ZooKeeperInternals.DEFAULT_STRING - } else { - configResource.name - } - adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName) - } -}