This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 68a1953 KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657)
68a1953 is described below
commit 68a19539cf1fcd86787960d0010b672d0d611b91
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Jan 21 16:00:21 2022 -0800
KAFKA-13552: Fix BROKER and BROKER_LOGGER in KRaft (#11657)
Currently, KRaft does not support setting BROKER_LOGGER configs (it always
fails.) Additionally,
there are several bugs in the handling of BROKER configs. They are not
properly validated on the
forwarding broker, and the way we apply them is buggy as well. This PR
fixes those issues.
KafkaApis: add support for doing validation and log4j processing on the
forwarding broker. This
involves breaking the config request apart and forwarding only part of it.
Adjust KafkaApisTest to
test the new behavior, rather than expecting forwarding of the full request.
MetadataSupport: remove MetadataSupport#controllerId since it duplicates
the functionality of
MetadataCache#controllerId. Add support for getResourceConfig and
maybeForward.
ControllerApis: log an error message if the handler throws an exception,
just like we do in
KafkaApis.
ControllerConfigurationValidator: add JavaDoc.
Move some functions that don't involve ZK from ZkAdminManager to
DynamicConfigManager. Move some
validation out of ZkAdminManager and into a new class, ConfigAdminManager,
which is not tied to ZK.
ForwardingManager: add support for sending new requests, rather than just
forwarding existing
requests.
BrokerMetadataPublisher: do not try to apply dynamic configurations for
brokers other than the
current one. Log an INFO message when applying a new dynamic config, like
we do in ZK mode. Also,
invoke reloadUpdatedFilesWithoutConfigChange when applying a new
non-default BROKER config.
QuorumController: fix a bug in ConfigResourceExistenceChecker which
prevented cluster configs from
being set. Add a test for this class.
Reviews: José Armando García Sancio <[email protected]>
---
.../requests/IncrementalAlterConfigsRequest.java | 2 +-
.../scala/kafka/server/ConfigAdminManager.scala | 518 +++++++++++++++++++++
.../main/scala/kafka/server/ConfigHandler.scala | 2 +-
.../main/scala/kafka/server/ControllerApis.scala | 8 +-
.../server/ControllerConfigurationValidator.scala | 14 +
.../scala/kafka/server/DynamicBrokerConfig.scala | 2 -
.../scala/kafka/server/ForwardingManager.scala | 76 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 128 +++--
core/src/main/scala/kafka/server/KafkaServer.scala | 7 +-
.../main/scala/kafka/server/MetadataSupport.scala | 45 +-
.../main/scala/kafka/server/ZkAdminManager.scala | 121 +----
...icConfigManager.scala => ZkConfigManager.scala} | 13 +-
.../server/metadata/BrokerMetadataListener.scala | 3 +
.../server/metadata/BrokerMetadataPublisher.scala | 49 +-
.../kafka/server/metadata/ZkConfigRepository.scala | 11 +-
.../server/DynamicBrokerReconfigurationTest.scala | 16 +-
.../kafka/server/KRaftClusterTest.scala | 260 ++++++++++-
.../unit/kafka/server/ConfigAdminManagerTest.scala | 466 ++++++++++++++++++
.../kafka/server/DynamicConfigChangeTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 93 +++-
.../apache/kafka/controller/QuorumController.java | 8 +-
.../kafka/controller/QuorumControllerTest.java | 39 ++
22 files changed, 1617 insertions(+), 266 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
index 2bc5914..9433e31 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -80,7 +80,7 @@ public class IncrementalAlterConfigsRequest extends
AbstractRequest {
private final IncrementalAlterConfigsRequestData data;
private final short version;
- private IncrementalAlterConfigsRequest(IncrementalAlterConfigsRequestData
data, short version) {
+ public IncrementalAlterConfigsRequest(IncrementalAlterConfigsRequestData
data, short version) {
super(ApiKeys.INCREMENTAL_ALTER_CONFIGS, version);
this.data = data;
this.version = version;
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
new file mode 100644
index 0000000..a7f5c6b
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -0,0 +1,518 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.server.metadata.ConfigRepository
+import kafka.utils.Log4jController
+import kafka.utils._
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
BROKER_LOGGER, TOPIC}
+import org.apache.kafka.common.config.{ConfigDef, ConfigResource,
LogLevelConfig}
+import org.apache.kafka.common.errors.{ApiException,
ClusterAuthorizationException, InvalidConfigurationException,
InvalidRequestException}
+import org.apache.kafka.common.message.{AlterConfigsRequestData,
AlterConfigsResponseData, IncrementalAlterConfigsRequestData,
IncrementalAlterConfigsResponseData}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource
=> LAlterConfigsResource}
+import
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
=> LAlterConfigsResourceResponse}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
=> IAlterConfigsResource, AlterableConfig => IAlterableConfig}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse
=> IAlterConfigsResourceResponse}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST,
UNKNOWN_SERVER_ERROR}
+import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.common.resource.{Resource, ResourceType}
+import org.slf4j.LoggerFactory
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+/**
+ * Manages dynamic configuration operations on the broker.
+ *
+ * There are two RPCs that alter KIP-226 dynamic configurations: alterConfigs,
and
+ * incrementalAlterConfigs. The main difference between the two is that
alterConfigs sets
+ * all configurations related to a specific config resource, whereas
+ * incrementalAlterConfigs makes only designated changes.
+ *
+ * The original, non-incremental AlterConfigs is deprecated because there are
inherent
+ * race conditions when multiple clients use it. It deletes any resource
configuration
+ * keys that are not specified. This leads to clients trying to do
read-modify-write
+ * cycles when they only want to change one config key. (But even
read-modify-write doesn't
+ * work correctly, since "sensitive" configurations are omitted when read.)
+ *
+ * KIP-412 added support for changing log4j log levels via
IncrementalAlterConfigs, but
+ * not via the original AlterConfigs. In retrospect, this would have been
better off as a
+ * separate RPC, since the semantics are quite different. In particular,
KIP-226 configs
+ * are stored durably (in ZK or KRaft) and persist across broker restarts, but
KIP-412
+ * log4j levels do not. However, we have to handle it here now in order to
maintain
+ * compatibility.
+ *
+ * Configuration processing is split into two parts.
+ * - The first step, called "preprocessing," handles setting KIP-412 log
levels, validating
+ * BROKER configurations. We also filter out some other things here like
UNKNOWN resource
+ * types, etc.
+ * - The second step is "persistence," and handles storing the configurations
durably to our
+ * metadata store.
+ *
+ * When KIP-590 forwarding is active (such as in KRaft mode), preprocessing
will happen
+ * on the broker, while persistence will happen on the active controller. (If
KIP-590
+ * forwarding is not active, then both steps are done on the same broker.)
+ *
+ * In KRaft mode, the active controller performs its own configuration
validation step in
+ * {@link kafka.server.ControllerConfigurationValidator}. This is mainly
important for
+ * TOPIC resources, since we already validated changes to BROKER resources on
the
+ * forwarding broker. The KRaft controller is also responsible for enforcing
the configured
+ * {@link org.apache.kafka.server.policy.AlterConfigPolicy}.
+ */
+class ConfigAdminManager(nodeId: Int,
+ conf: KafkaConfig,
+ configRepository: ConfigRepository) extends Logging {
+ import ConfigAdminManager._
+
+ this.logIdent = "[ConfigAdminManager[nodeId=" + nodeId + "]: "
+
+ /**
+ * Preprocess an incremental configuration operation on the broker. This
step handles
+ * setting log4j levels, as well as filtering out some invalid resource
requests that
+ * should not be forwarded to the controller.
+ *
+ * @param request The request data.
+ * @param authorize A callback which is invoked when we need to authorize
an operation.
+ * Currently, we only use this for log4j operations.
Other types of
+ * operations are authorized in the persistence step. The
arguments
+ * are the type and name of the resource to be authorized.
+ *
+ * @return A map from resources to errors. If a resource appears
in this map,
+ * it has been preprocessed and does not need further
processing.
+ */
+ def preprocess(
+ request: IncrementalAlterConfigsRequestData,
+ authorize: (ResourceType, String) => Boolean
+ ): util.IdentityHashMap[IAlterConfigsResource, ApiError] = {
+ val results = new util.IdentityHashMap[IAlterConfigsResource, ApiError]()
+ val resourceIds = new util.HashMap[(Byte, String), IAlterConfigsResource]
+ request.resources().forEach(resource => {
+ val preexisting = resourceIds.put((resource.resourceType(),
resource.resourceName()), resource)
+ if (preexisting != null) {
+ Seq(preexisting, resource).foreach(
+ r => results.put(r, new ApiError(INVALID_REQUEST, "Each resource
must appear at most once.")))
+ }
+ })
+ request.resources().forEach(resource => {
+ if (!results.containsKey(resource)) {
+ val resourceType = ConfigResource.Type.forId(resource.resourceType())
+ val configResource = new ConfigResource(resourceType,
resource.resourceName())
+ try {
+ if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
+ throw new InvalidRequestException("Error due to duplicate config
keys")
+ }
+ val nullUpdates = new util.ArrayList[String]()
+ resource.configs().forEach { config =>
+ if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
+ config.value() == null) {
+ nullUpdates.add(config.name())
+ }
+ }
+ if (!nullUpdates.isEmpty()) {
+ throw new InvalidRequestException("Null value not supported for :
" +
+ String.join(", ", nullUpdates))
+ }
+ resourceType match {
+ case BROKER_LOGGER =>
+ if (!authorize(ResourceType.CLUSTER, Resource.CLUSTER_NAME)) {
+ throw new
ClusterAuthorizationException(Errors.CLUSTER_AUTHORIZATION_FAILED.message())
+ }
+ validateResourceNameIsCurrentNodeId(resource.resourceName())
+ validateLogLevelConfigs(resource.configs())
+ if (!request.validateOnly()) {
+ alterLogLevelConfigs(resource.configs())
+ }
+ results.put(resource, ApiError.NONE)
+ case BROKER =>
+ // The resource name must be either blank (if setting a cluster
config) or
+ // the ID of this specific broker.
+ if (!configResource.name().isEmpty) {
+ validateResourceNameIsCurrentNodeId(resource.resourceName())
+ }
+ validateBrokerConfigChange(resource, configResource)
+ case TOPIC =>
+ // Nothing to do.
+ case _ =>
+ throw new InvalidRequestException(s"Unknown resource type
${resource.resourceType().toInt}")
+ }
+ } catch {
+ case t: Throwable => {
+ val err = ApiError.fromThrowable(t)
+ info(s"Error preprocessing incrementalAlterConfigs request on
${configResource}", t)
+ results.put(resource, err)
+ }
+ }
+ }
+ })
+ results
+ }
+
+ def validateBrokerConfigChange(
+ resource: IAlterConfigsResource,
+ configResource: ConfigResource
+ ): Unit = {
+ val perBrokerConfig = !configResource.name().isEmpty
+ val persistentProps = configRepository.config(configResource)
+ val configProps = conf.dynamicConfig.fromPersistentProps(persistentProps,
perBrokerConfig)
+ val alterConfigOps = resource.configs().asScala.map {
+ case config =>
+ val opType = AlterConfigOp.OpType.forId(config.configOperation())
+ if (opType == null) {
+ throw new InvalidRequestException(s"Unknown operations type
${config.configOperation}")
+ }
+ new AlterConfigOp(new ConfigEntry(config.name(), config.value()),
opType)
+ }.toSeq
+ prepareIncrementalConfigs(alterConfigOps, configProps,
KafkaConfig.configKeys)
+ try {
+ validateBrokerConfigChange(configProps, configResource)
+ } catch {
+ case t: Throwable => error(s"validation of configProps ${configProps}
for ${configResource} failed with exception", t)
+ throw t
+ }
+ }
+
+ def validateBrokerConfigChange(
+ props: Properties,
+ configResource: ConfigResource
+ ): Unit = {
+ try {
+ conf.dynamicConfig.validate(props, !configResource.name().isEmpty)
+ } catch {
+ case e: ApiException => throw e
+ //KAFKA-13609: InvalidRequestException is not really the right exception
here if the
+ // configuration fails validation. The configuration is still
well-formed, but just
+ // can't be applied. It should probably throw
InvalidConfigurationException. However,
+ // we should probably only change this in a KIP since it has
compatibility implications.
+ case e: Throwable => throw new InvalidRequestException(e.getMessage)
+ }
+ }
+
+ /**
+ * Preprocess a legacy configuration operation on the broker.
+ *
+ * @param request The request data.
+ *
+ * @return
+ */
+ def preprocess(
+ request: AlterConfigsRequestData,
+ ): util.IdentityHashMap[LAlterConfigsResource, ApiError] = {
+ val results = new util.IdentityHashMap[LAlterConfigsResource, ApiError]()
+ val resourceIds = new util.HashMap[(Byte, String), LAlterConfigsResource]
+ request.resources().forEach(resource => {
+ val preexisting = resourceIds.put((resource.resourceType(),
resource.resourceName()), resource)
+ if (preexisting != null) {
+ Seq(preexisting, resource).foreach(
+ r => results.put(r, new ApiError(INVALID_REQUEST, "Each resource
must appear at most once.")))
+ }
+ })
+ request.resources().forEach(resource => {
+ if (!results.containsKey(resource)) {
+ val resourceType = ConfigResource.Type.forId(resource.resourceType())
+ val configResource = new ConfigResource(resourceType,
resource.resourceName())
+ try {
+ if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
+ throw new InvalidRequestException("Error due to duplicate config
keys")
+ }
+ val nullUpdates = new util.ArrayList[String]()
+ resource.configs().forEach { config =>
+ if (config.value() == null) {
+ nullUpdates.add(config.name())
+ }
+ }
+ if (!nullUpdates.isEmpty()) {
+ throw new InvalidRequestException("Null value not supported for :
" +
+ String.join(", ", nullUpdates))
+ }
+ resourceType match {
+ case BROKER =>
+ if (!configResource.name().isEmpty) {
+ validateResourceNameIsCurrentNodeId(resource.resourceName())
+ }
+ validateBrokerConfigChange(resource, configResource)
+ case TOPIC =>
+ // Nothing to do.
+ case _ =>
+ // Since legacy AlterConfigs does not support BROKER_LOGGER, any
attempt to use it
+ // gets caught by this clause.
+ throw new InvalidRequestException(s"Unknown resource type
${resource.resourceType().toInt}")
+ }
+ } catch {
+ case t: Throwable => {
+ val err = ApiError.fromThrowable(t)
+ info(s"Error preprocessing alterConfigs request on
${configResource}: ${err}")
+ results.put(resource, err)
+ }
+ }
+ }
+ })
+ results
+ }
+
+ def validateBrokerConfigChange(
+ resource: LAlterConfigsResource,
+ configResource: ConfigResource
+ ): Unit = {
+ val props = new Properties()
+ resource.configs().forEach {
+ config => props.setProperty(config.name(), config.value())
+ }
+ validateBrokerConfigChange(props, configResource)
+ }
+
+ def validateResourceNameIsCurrentNodeId(name: String): Unit = {
+ val id = try name.toInt catch {
+ case _: NumberFormatException =>
+ throw new InvalidRequestException(s"Node id must be an integer, but it
is: $name")
+ }
+ if (id != nodeId) {
+ throw new InvalidRequestException(s"Unexpected broker id, expected
${nodeId}, but received ${name}")
+ }
+ }
+
+ def validateLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
+ def validateLoggerNameExists(loggerName: String): Unit = {
+ if (!Log4jController.loggerExists(loggerName)) {
+ throw new InvalidConfigurationException(s"Logger $loggerName does not
exist!")
+ }
+ }
+ ops.forEach { op =>
+ val loggerName = op.name
+ OpType.forId(op.configOperation()) match {
+ case OpType.SET =>
+ validateLoggerNameExists(loggerName)
+ val logLevel = op.value()
+ if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
+ val validLevelsStr =
LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
+ throw new InvalidConfigurationException(
+ s"Cannot set the log level of $loggerName to $logLevel as it is
not a supported log level. " +
+ s"Valid log levels are $validLevelsStr"
+ )
+ }
+ case OpType.DELETE =>
+ validateLoggerNameExists(loggerName)
+ if (loggerName == Log4jController.ROOT_LOGGER)
+ throw new InvalidRequestException(s"Removing the log level of the
${Log4jController.ROOT_LOGGER} logger is not allowed")
+ case OpType.APPEND => throw new
InvalidRequestException(s"${OpType.APPEND} " +
+ s"operation is not allowed for the ${BROKER_LOGGER} resource")
+ case OpType.SUBTRACT => throw new
InvalidRequestException(s"${OpType.SUBTRACT} " +
+ s"operation is not allowed for the ${BROKER_LOGGER} resource")
+ case _ => throw new InvalidRequestException(s"Unknown operation type
${op.configOperation()} " +
+ s"is not allowed for the ${BROKER_LOGGER} resource")
+ }
+ }
+ }
+
+ def alterLogLevelConfigs(ops: util.Collection[IAlterableConfig]): Unit = {
+ ops.forEach { op =>
+ val loggerName = op.name()
+ val logLevel = op.value()
+ OpType.forId(op.configOperation()) match {
+ case OpType.SET =>
+ info(s"Updating the log level of $loggerName to $logLevel")
+ Log4jController.logLevel(loggerName, logLevel)
+ case OpType.DELETE =>
+ info(s"Unset the log level of $loggerName")
+ Log4jController.unsetLogLevel(loggerName)
+ case _ => throw new IllegalArgumentException(
+ s"Invalid log4j configOperation: ${op.configOperation()}")
+ }
+ }
+ }
+}
+
+object ConfigAdminManager {
+ val log = LoggerFactory.getLogger(classOf[ConfigAdminManager])
+
+ /**
+ * Copy the incremental configs request data without any already-processed
elements.
+ *
+ * @param request The input request. Will not be modified.
+ * @param processed A map containing the resources that have already been
processed.
+ * @return A new request object.
+ */
+ def copyWithoutPreprocessed(
+ request: IncrementalAlterConfigsRequestData,
+ processed: util.IdentityHashMap[IAlterConfigsResource, ApiError]
+ ): IncrementalAlterConfigsRequestData = {
+ val copy = new IncrementalAlterConfigsRequestData().
+ setValidateOnly(request.validateOnly())
+ request.resources().forEach(resource => {
+ if (!processed.containsKey(resource)) {
+ copy.resources().mustAdd(resource.duplicate())
+ }
+ })
+ copy
+ }
+
+ /**
+ * Copy the legacy alter configs request data without any already-processed
elements.
+ *
+ * @param request The input request. Will not be modified.
+ * @param processed A map containing the resources that have already been
processed.
+ * @return A new request object.
+ */
+ def copyWithoutPreprocessed(
+ request: AlterConfigsRequestData,
+ processed: util.IdentityHashMap[LAlterConfigsResource, ApiError]
+ ): AlterConfigsRequestData = {
+ val copy = new AlterConfigsRequestData().
+ setValidateOnly(request.validateOnly())
+ request.resources().forEach(resource => {
+ if (!processed.containsKey(resource)) {
+ copy.resources().mustAdd(resource.duplicate())
+ }
+ })
+ copy
+ }
+
+ def reassembleIncrementalResponse(
+ original: IncrementalAlterConfigsRequestData,
+ preprocessingResponses: util.IdentityHashMap[IAlterConfigsResource,
ApiError],
+ persistentResponses: IncrementalAlterConfigsResponseData
+ ): IncrementalAlterConfigsResponseData = {
+ val response = new IncrementalAlterConfigsResponseData()
+ val responsesByResource =
persistentResponses.responses().iterator().asScala.map {
+ case r => (r.resourceName(), r.resourceType()) -> new
ApiError(r.errorCode(), r.errorMessage())
+ }.toMap
+ original.resources().forEach(r => {
+ val err = Option(preprocessingResponses.get(r)) match {
+ case None =>
+ responsesByResource.get((r.resourceName(), r.resourceType())) match {
+ case None => log.error("The controller returned fewer results than
we " +
+ s"expected. No response found for ${r}.")
+ new ApiError(UNKNOWN_SERVER_ERROR)
+ case Some(err) => err
+ }
+ case Some(err) => err
+ }
+ response.responses().add(new IAlterConfigsResourceResponse().
+ setResourceName(r.resourceName()).
+ setResourceType(r.resourceType()).
+ setErrorCode(err.error().code()).
+ setErrorMessage(err.message()))
+ })
+ response
+ }
+
+ def reassembleLegacyResponse(
+ original: AlterConfigsRequestData,
+ preprocessingResponses: util.IdentityHashMap[LAlterConfigsResource,
ApiError],
+ persistentResponses: AlterConfigsResponseData
+ ): AlterConfigsResponseData = {
+ val response = new AlterConfigsResponseData()
+ val responsesByResource =
persistentResponses.responses().iterator().asScala.map {
+ case r => (r.resourceName(), r.resourceType()) -> new
ApiError(r.errorCode(), r.errorMessage())
+ }.toMap
+ original.resources().forEach(r => {
+ val err = Option(preprocessingResponses.get(r)) match {
+ case None =>
+ responsesByResource.get((r.resourceName(), r.resourceType())) match {
+ case None => log.error("The controller returned fewer results than
we " +
+ s"expected. No response found for ${r}.")
+ new ApiError(UNKNOWN_SERVER_ERROR)
+ case Some(err) => err
+ }
+ case Some(err) => err
+ }
+ response.responses().add(new LAlterConfigsResourceResponse().
+ setResourceName(r.resourceName()).
+ setResourceType(r.resourceType()).
+ setErrorCode(err.error().code()).
+ setErrorMessage(err.message()))
+ })
+ response
+ }
+
+ def containsDuplicates[T](
+ iterable: Iterable[T]
+ ): Boolean = {
+ val previous = new util.HashSet[T]()
+ !iterable.forall(previous.add(_))
+ }
+
+ /**
+ * Convert the configuration properties for an object (broker, topic, etc.)
to a Scala
+ * map. Sensitive configurations will be redacted, so that the output is
suitable for
+ * logging.
+ *
+ * @param resource The configuration resource.
+ * @param configProps The configuration as a Properties object.
+ * @return A map containing all the configuration keys and
values, as they
+ * should be logged.
+ */
+ def toLoggableProps(resource: ConfigResource, configProps: Properties):
Map[String, String] = {
+ configProps.asScala.map {
+ case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`,
key, value))
+ }
+ }
+
+ /**
+ * Apply a series of incremental configuration operations to a set of
resource properties.
+ *
+ * @param alterConfigOps The incremental configuration operations to apply.
+ * @param configProps The resource properties. This will be modified by
this function.
+ * @param configKeys Information about configuration key types.
+ */
+ def prepareIncrementalConfigs(
+ alterConfigOps: Seq[AlterConfigOp],
+ configProps: Properties,
+ configKeys: Map[String, ConfigKey]
+ ): Unit = {
+ def listType(configName: String, configKeys: Map[String, ConfigKey]):
Boolean = {
+ val configKey = configKeys(configName)
+ if (configKey == null)
+ throw new InvalidConfigurationException(s"Unknown config name:
$configName")
+ configKey.`type` == ConfigDef.Type.LIST
+ }
+
+ alterConfigOps.foreach { alterConfigOp =>
+ val configPropName = alterConfigOp.configEntry.name
+ alterConfigOp.opType() match {
+ case OpType.SET =>
configProps.setProperty(alterConfigOp.configEntry.name,
alterConfigOp.configEntry.value)
+ case OpType.DELETE =>
configProps.remove(alterConfigOp.configEntry.name)
+ case OpType.APPEND => {
+ if (!listType(alterConfigOp.configEntry.name, configKeys))
+ throw new InvalidRequestException(s"Config value append is not
allowed for config key: ${alterConfigOp.configEntry.name}")
+ val oldValueList =
Option(configProps.getProperty(alterConfigOp.configEntry.name))
+
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
ConfigDef.Type.LIST)))
+ .getOrElse("")
+ .split(",").toList
+ val newValueList = oldValueList :::
alterConfigOp.configEntry.value.split(",").toList
+ configProps.setProperty(alterConfigOp.configEntry.name,
newValueList.mkString(","))
+ }
+ case OpType.SUBTRACT => {
+ if (!listType(alterConfigOp.configEntry.name, configKeys))
+ throw new InvalidRequestException(s"Config value subtract is not
allowed for config key: ${alterConfigOp.configEntry.name}")
+ val oldValueList =
Option(configProps.getProperty(alterConfigOp.configEntry.name))
+
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
ConfigDef.Type.LIST)))
+ .getOrElse("")
+ .split(",").toList
+ val newValueList =
oldValueList.diff(alterConfigOp.configEntry.value.split(",").toList)
+ configProps.setProperty(alterConfigOp.configEntry.name,
newValueList.mkString(","))
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ab8639b..2fe49ad 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -42,7 +42,7 @@ import scala.collection.Seq
import scala.util.Try
/**
- * The ConfigHandler is used to process config change notifications received
by the DynamicConfigManager
+ * The ConfigHandler is used to process broker configuration change
notifications.
*/
trait ConfigHandler {
def processConfigChanges(entityName: String, value: Properties): Unit
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index ed9b55a..5681232 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -112,8 +112,12 @@ class ControllerApis(val requestChannel: RequestChannel,
}
} catch {
case e: FatalExitError => throw e
- case e: ExecutionException => requestHelper.handleError(request,
e.getCause)
- case e: Throwable => requestHelper.handleError(request, e)
+ case e: Throwable => {
+ val t = if (e.isInstanceOf[ExecutionException]) e.getCause() else e
+ error(s"Unexpected error handling request ${request.requestDesc(true)}
" +
+ s"with context ${request.context}", t)
+ requestHelper.handleError(request, t)
+ }
}
}
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 59a1f3c..5cc075e 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -29,6 +29,20 @@ import org.apache.kafka.common.internals.Topic
import scala.collection.mutable
+/**
+ * The validator that the controller uses for dynamic configuration changes.
+ * It performs the generic validation, which can't be bypassed. If an
AlterConfigPolicy
+ * is configured, the controller will check that after verifying that this
passes.
+ *
+ * For changes to BROKER resources, the forwarding broker performs an extra
validation step
+ * in {@link kafka.server.ConfigAdminManager#preprocess()} before sending the
change to
+ * the controller. Therefore, the validation here is just a kind of sanity
check, which
+ * should never fail under normal conditions.
+ *
+ * This validator does not handle changes to BROKER_LOGGER resources. Despite
being bundled
+ * in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the
same sense
+ * as the others. It is not persisted to the metadata log (or to ZK, when
we're in that mode).
+ */
class ControllerConfigurationValidator extends ConfigurationValidator {
override def validate(resource: ConfigResource, config: util.Map[String,
String]): Unit = {
resource.`type`() match {
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 1f204e2..cb6cd84 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -252,8 +252,6 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId,
kafkaServer))
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
- if (kafkaServer.logManager.cleaner != null)
- addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager,
kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index e84592b..ce7f35f 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -30,7 +30,51 @@ import scala.compat.java8.OptionConverters._
trait ForwardingManager {
def forwardRequest(
- request: RequestChannel.Request,
+ originalRequest: RequestChannel.Request,
+ responseCallback: Option[AbstractResponse] => Unit
+ ): Unit = {
+ val buffer = originalRequest.buffer.duplicate()
+ buffer.flip()
+ forwardRequest(originalRequest.context,
+ buffer,
+ originalRequest.body[AbstractRequest],
+ () => originalRequest.toString,
+ responseCallback)
+ }
+
+ def forwardRequest(
+ originalRequest: RequestChannel.Request,
+ newRequestBody: AbstractRequest,
+ responseCallback: Option[AbstractResponse] => Unit
+ ): Unit = {
+ val buffer = newRequestBody.serializeWithHeader(originalRequest.header)
+ forwardRequest(originalRequest.context,
+ buffer,
+ newRequestBody,
+ () => originalRequest.toString,
+ responseCallback)
+ }
+
+ /**
+ * Forward given request to the active controller.
+ *
+ * @param requestContext The request context of the original envelope
request.
+ * @param requestBufferCopy The request buffer we want to send. This
should not be the original
+ * byte buffer from the envelope request, since
we will be mutating
+ * the position and limit fields. It should be a
copy.
+ * @param requestBody The AbstractRequest we are sending.
+ * @param requestToString A callback which can be invoked to produce a
human-readable decription
+ * of the request.
+ * @param responseCallback A callback which takes in an
`Option[AbstractResponse]`.
+ * We will call this function with Some(x) after
the controller responds with x.
+ * Or, if the controller doesn't support the
request version, we will complete
+ * the callback with None.
+ */
+ def forwardRequest(
+ requestContext: RequestContext,
+ requestBufferCopy: ByteBuffer,
+ requestBody: AbstractRequest,
+ requestToString: () => String,
responseCallback: Option[AbstractResponse] => Unit
): Unit
@@ -63,32 +107,24 @@ class ForwardingManagerImpl(
channelManager: BrokerToControllerChannelManager
) extends ForwardingManager with Logging {
- /**
- * Forward given request to the active controller.
- *
- * @param request request to be forwarded
- * @param responseCallback callback which takes in an
`Option[AbstractResponse]`, where
- * None is indicating that controller doesn't
support the request
- * version.
- */
override def forwardRequest(
- request: RequestChannel.Request,
+ requestContext: RequestContext,
+ requestBufferCopy: ByteBuffer,
+ requestBody: AbstractRequest,
+ requestToString: () => String,
responseCallback: Option[AbstractResponse] => Unit
): Unit = {
- val requestBuffer = request.buffer.duplicate()
- requestBuffer.flip()
- val envelopeRequest =
ForwardingManager.buildEnvelopeRequest(request.context, requestBuffer)
+ val envelopeRequest =
ForwardingManager.buildEnvelopeRequest(requestContext, requestBufferCopy)
class ForwardingResponseHandler extends ControllerRequestCompletionHandler
{
override def onComplete(clientResponse: ClientResponse): Unit = {
- val requestBody = request.body[AbstractRequest]
if (clientResponse.versionMismatch != null) {
- debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to request
$requestBody " +
+ debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to
${requestToString()} " +
s"due to unexpected version error", clientResponse.versionMismatch)
responseCallback(Some(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)))
} else if (clientResponse.authenticationException != null) {
- debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to request
$requestBody " +
+ debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to
${requestToString()} " +
s"due to authentication error",
clientResponse.authenticationException)
responseCallback(Some(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)))
} else {
@@ -108,10 +144,10 @@ class ForwardingManagerImpl(
// the error directly to the client since it would not be
expected. Instead we
// return `UNKNOWN_SERVER_ERROR` so that the user knows that
there is a problem
// on the broker.
- debug(s"Forwarded request $request failed with an error in the
envelope response $envelopeError")
+ debug(s"Forwarded request ${requestToString()} failed with an
error in the envelope response $envelopeError")
requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
} else {
- parseResponse(envelopeResponse.responseData, requestBody,
request.header)
+ parseResponse(envelopeResponse.responseData, requestBody,
requestContext.header)
}
responseCallback(Option(response))
}
@@ -119,8 +155,8 @@ class ForwardingManagerImpl(
}
override def onTimeout(): Unit = {
- debug(s"Forwarding of the request $request failed due to timeout
exception")
- val response = request.body[AbstractRequest].getErrorResponse(new
TimeoutException())
+ debug(s"Forwarding of the request ${requestToString()} failed due to
timeout exception")
+ val response = requestBody.getErrorResponse(new TimeoutException())
responseCallback(Option(response))
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5978321..e1c4338 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -56,7 +56,7 @@ import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEn
import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
@@ -113,6 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val authHelper = new AuthHelper(authorizer)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker",
config)
+ val configManager = new ConfigAdminManager(brokerId, config,
configRepository)
def close(): Unit = {
aclApis.close()
@@ -130,17 +131,19 @@ class KafkaApis(val requestChannel: RequestChannel,
def responseCallback(responseOpt: Option[AbstractResponse]): Unit = {
responseOpt match {
case Some(response) => requestHelper.sendForwardedResponse(request,
response)
- case None =>
- info(s"The client connection will be closed due to controller
responded " +
- s"unsupported version exception during $request forwarding. " +
- s"This could happen when the controller changed after the
connection was established.")
- requestChannel.closeConnection(request, Collections.emptyMap())
+ case None => handleInvalidVersionsDuringForwarding(request)
}
}
-
metadataSupport.maybeForward(request, handler, responseCallback)
}
+ private def handleInvalidVersionsDuringForwarding(request:
RequestChannel.Request): Unit = {
+ info(s"The client connection will be closed due to controller responded " +
+ s"unsupported version exception during $request forwarding. " +
+ s"This could happen when the controller changed after the connection was
established.")
+ requestChannel.closeConnection(request, Collections.emptyMap())
+ }
+
private def forwardToControllerOrFail(
request: RequestChannel.Request
): Unit = {
@@ -198,7 +201,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => maybeForwardToController(request,
handleCreateAcls)
case ApiKeys.DELETE_ACLS => maybeForwardToController(request,
handleDeleteAcls)
- case ApiKeys.ALTER_CONFIGS => maybeForwardToController(request,
handleAlterConfigsRequest)
+ case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS =>
handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
@@ -210,7 +213,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_DELEGATION_TOKEN =>
handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request,
requestLocal)
case ApiKeys.ELECT_LEADERS => maybeForwardToController(request,
handleElectLeaders)
- case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
+ case ApiKeys.INCREMENTAL_ALTER_CONFIGS =>
handleIncrementalAlterConfigsRequest(request)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS =>
maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS =>
maybeForwardToController(request, handleListPartitionReassignmentsRequest)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request,
requestLocal)
@@ -1301,7 +1304,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestThrottleMs,
brokers.toList.asJava,
clusterId,
-
metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
@@ -2621,16 +2624,45 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
- val alterConfigsRequest = request.body[AlterConfigsRequest]
+ val original = request.body[AlterConfigsRequest]
+ val preprocessingResponses = configManager.preprocess(original.data())
+ val remaining =
ConfigAdminManager.copyWithoutPreprocessed(original.data(),
preprocessingResponses)
+ def sendResponse(secondPart: Option[ApiMessage]): Unit = {
+ secondPart match {
+ case Some(result: AlterConfigsResponseData) =>
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new
AlterConfigsResponse(ConfigAdminManager.reassembleLegacyResponse(
+ original.data(),
+ preprocessingResponses,
+ result).setThrottleTimeMs(requestThrottleMs)))
+ case _ => handleInvalidVersionsDuringForwarding(request)
+ }
+ }
+ if (remaining.resources().isEmpty) {
+ sendResponse(Some(new AlterConfigsResponseData()))
+ } else if ((!request.isForwarded) && metadataSupport.canForward()) {
+ metadataSupport.forwardingManager.get.forwardRequest(request,
+ new AlterConfigsRequest(remaining, request.header.apiVersion()),
+ response => sendResponse(response.map(_.data())))
+ } else {
+ sendResponse(Some(processLegacyAlterConfigsRequest(request, remaining)))
+ }
+ }
+
+ def processLegacyAlterConfigsRequest(
+ originalRequest: RequestChannel.Request,
+ data: AlterConfigsRequestData
+ ): AlterConfigsResponseData = {
+ val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
+ val alterConfigsRequest = new AlterConfigsRequest(data,
originalRequest.header.apiVersion())
val (authorizedResources, unauthorizedResources) =
alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
resource.`type` match {
case ConfigResource.Type.BROKER_LOGGER =>
throw new InvalidRequestException(s"AlterConfigs is deprecated and
does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
case ConfigResource.Type.BROKER =>
- authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER,
CLUSTER_NAME)
+ authHelper.authorize(originalRequest.context, ALTER_CONFIGS,
CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
- authHelper.authorize(request.context, ALTER_CONFIGS, TOPIC,
resource.name)
+ authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC,
resource.name)
case rt => throw new InvalidRequestException(s"Unexpected resource
type $rt")
}
}
@@ -2638,19 +2670,15 @@ class KafkaApis(val requestChannel: RequestChannel,
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(resource)
}
- def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
- val data = new AlterConfigsResponseData()
- .setThrottleTimeMs(requestThrottleMs)
- (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error)
=>
- data.responses().add(new AlterConfigsResourceResponse()
- .setErrorCode(error.error.code)
- .setErrorMessage(error.message)
- .setResourceName(resource.name)
- .setResourceType(resource.`type`.id))
- }
- new AlterConfigsResponse(data)
+ val response = new AlterConfigsResponseData()
+ (authorizedResult ++ unauthorizedResult).foreach { case (resource, error)
=>
+ response.responses().add(new AlterConfigsResourceResponse()
+ .setErrorCode(error.error.code)
+ .setErrorMessage(error.message)
+ .setResourceName(resource.name)
+ .setResourceType(resource.`type`.id))
}
- requestHelper.sendResponseMaybeThrottle(request, responseCallback)
+ response
}
def handleAlterPartitionReassignmentsRequest(request:
RequestChannel.Request): Unit = {
@@ -2750,10 +2778,40 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleIncrementalAlterConfigsRequest(request: RequestChannel.Request):
Unit = {
- val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
- val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
+ val original = request.body[IncrementalAlterConfigsRequest]
+ val preprocessingResponses = configManager.preprocess(original.data(),
+ (rType, rName) => authHelper.authorize(request.context, ALTER_CONFIGS,
rType, rName))
+ val remaining =
ConfigAdminManager.copyWithoutPreprocessed(original.data(),
preprocessingResponses)
+
+ def sendResponse(secondPart: Option[ApiMessage]): Unit = {
+ secondPart match {
+ case Some(result: IncrementalAlterConfigsResponseData) =>
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+ new
IncrementalAlterConfigsResponse(ConfigAdminManager.reassembleIncrementalResponse(
+ original.data(),
+ preprocessingResponses,
+ result).setThrottleTimeMs(requestThrottleMs)))
+ case _ => handleInvalidVersionsDuringForwarding(request)
+ }
+ }
- val configs = alterConfigsRequest.data.resources.iterator.asScala.map {
alterConfigResource =>
+ if (remaining.resources().isEmpty) {
+ sendResponse(Some(new IncrementalAlterConfigsResponseData()))
+ } else if ((!request.isForwarded) && metadataSupport.canForward()) {
+ metadataSupport.forwardingManager.get.forwardRequest(request,
+ new IncrementalAlterConfigsRequest(remaining,
request.header.apiVersion()),
+ response => sendResponse(response.map(_.data())))
+ } else {
+ sendResponse(Some(processIncrementalAlterConfigsRequest(request,
remaining)))
+ }
+ }
+
+ def processIncrementalAlterConfigsRequest(
+ originalRequest: RequestChannel.Request,
+ data: IncrementalAlterConfigsRequestData
+ ): IncrementalAlterConfigsResponseData = {
+ val zkSupport =
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))
+ val configs = data.resources.iterator.asScala.map { alterConfigResource =>
val configResource = new
ConfigResource(ConfigResource.Type.forId(alterConfigResource.resourceType),
alterConfigResource.resourceName)
configResource -> alterConfigResource.configs.iterator.asScala.map {
@@ -2765,20 +2823,18 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedResources, unauthorizedResources) = configs.partition {
case (resource, _) =>
resource.`type` match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
- authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER,
CLUSTER_NAME)
+ authHelper.authorize(originalRequest.context, ALTER_CONFIGS,
CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
- authHelper.authorize(request.context, ALTER_CONFIGS, TOPIC,
resource.name)
+ authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC,
resource.name)
case rt => throw new InvalidRequestException(s"Unexpected resource
type $rt")
}
}
- val authorizedResult =
zkSupport.adminManager.incrementalAlterConfigs(authorizedResources,
alterConfigsRequest.data.validateOnly)
+ val authorizedResult =
zkSupport.adminManager.incrementalAlterConfigs(authorizedResources,
data.validateOnly)
val unauthorizedResult = unauthorizedResources.keys.map { resource =>
resource -> configsAuthorizationApiError(resource)
}
-
- requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new
IncrementalAlterConfigsResponse(
- requestThrottleMs, (authorizedResult ++ unauthorizedResult).asJava))
+ new IncrementalAlterConfigsResponse(0, (authorizedResult ++
unauthorizedResult).asJava).data()
}
def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
@@ -3287,7 +3343,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val brokers =
metadataCache.getAliveBrokerNodes(request.context.listenerName)
- val controllerId =
metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
+ val controllerId =
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val data = new DescribeClusterResponseData()
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index d384427..bd899e1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -121,7 +121,7 @@ class KafkaServer(
var tokenManager: DelegationTokenManager = null
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
- var dynamicConfigManager: DynamicConfigManager = null
+ var dynamicConfigManager: ZkConfigManager = null
var credentialProvider: CredentialProvider = null
var tokenCache: DelegationTokenCache = null
@@ -233,7 +233,7 @@ class KafkaServer(
this.logIdent = logContext.logPrefix
// initialize dynamic broker configs from ZooKeeper. Any updates made
after this will be
- // applied after DynamicConfigManager starts.
+ // applied after ZkConfigManager starts.
config.dynamicConfig.initialize(Some(zkClient))
/* start scheduler */
@@ -428,6 +428,7 @@ class KafkaServer(
/* Add all reconfigurables for config change notification before
starting config handlers */
config.dynamicConfig.addReconfigurables(this)
+
Option(logManager.cleaner).foreach(config.dynamicConfig.addBrokerReconfigurable)
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic ->
new TopicConfigHandler(logManager, config, quotaManagers,
Some(kafkaController)),
@@ -437,7 +438,7 @@ class KafkaServer(
ConfigType.Ip ->
new IpConfigHandler(socketServer.connectionQuotas))
// Create the config manager. start listening to notifications
- dynamicConfigManager = new DynamicConfigManager(zkClient,
dynamicConfigHandlers)
+ dynamicConfigManager = new ZkConfigManager(zkClient,
dynamicConfigHandlers)
dynamicConfigManager.startup()
socketServer.startProcessingRequests(authorizerFutures)
diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala
b/core/src/main/scala/kafka/server/MetadataSupport.scala
index ecacffa..aeedd40 100644
--- a/core/src/main/scala/kafka/server/MetadataSupport.scala
+++ b/core/src/main/scala/kafka/server/MetadataSupport.scala
@@ -56,11 +56,19 @@ sealed trait MetadataSupport {
*/
def ensureConsistentWith(config: KafkaConfig): Unit
- def maybeForward(request: RequestChannel.Request,
- handler: RequestChannel.Request => Unit,
- responseCallback: Option[AbstractResponse] => Unit): Unit
-
- def controllerId: Option[Int]
+ def canForward(): Boolean
+
+ def maybeForward(
+ request: RequestChannel.Request,
+ handler: RequestChannel.Request => Unit,
+ responseCallback: Option[AbstractResponse] => Unit
+ ): Unit = {
+ if (!request.isForwarded && canForward()) {
+ forwardingManager.get.forwardRequest(request, responseCallback)
+ } else {
+ handler(request)
+ }
+ }
}
case class ZkSupport(adminManager: ZkAdminManager,
@@ -79,16 +87,7 @@ case class ZkSupport(adminManager: ZkAdminManager,
}
}
- override def maybeForward(request: RequestChannel.Request,
- handler: RequestChannel.Request => Unit,
- responseCallback: Option[AbstractResponse] =>
Unit): Unit = {
- forwardingManager match {
- case Some(mgr) if !request.isForwarded && !controller.isActive =>
mgr.forwardRequest(request, responseCallback)
- case _ => handler(request)
- }
- }
-
- override def controllerId: Option[Int] = metadataCache.getControllerId
+ override def canForward(): Boolean = forwardingManager.isDefined &&
(!controller.isActive)
}
case class RaftSupport(fwdMgr: ForwardingManager, metadataCache:
KRaftMetadataCache)
@@ -103,19 +102,5 @@ case class RaftSupport(fwdMgr: ForwardingManager,
metadataCache: KRaftMetadataCa
}
}
- override def maybeForward(request: RequestChannel.Request,
- handler: RequestChannel.Request => Unit,
- responseCallback: Option[AbstractResponse] =>
Unit): Unit = {
- if (!request.isForwarded) {
- fwdMgr.forwardRequest(request, responseCallback)
- } else {
- handler(request) // will reject
- }
- }
-
- /**
- * Get the broker ID to return from a MetadataResponse. This will be a
broker ID, as
- * described in KRaftMetadataCache#getControllerId. See that function for
more details.
- */
- override def controllerId: Option[Int] = metadataCache.getControllerId
+ override def canForward(): Boolean = true
}
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index d2e7456..465004d 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -22,18 +22,16 @@ import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils}
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
-import kafka.utils.Log4jController
import kafka.metrics.KafkaMetricsGroup
+import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs,
toLoggableProps}
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.metadata.ZkConfigRepository
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
-import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.config.ConfigDef.ConfigKey
-import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource}
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.errors.{ApiException,
InvalidConfigurationException, InvalidPartitionsException,
InvalidReplicaAssignmentException, InvalidRequestException,
ReassignmentInProgressException, TopicExistsException,
UnknownTopicOrPartitionException, UnsupportedVersionException}
import
org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
@@ -388,14 +386,10 @@ class ZkAdminManager(val config: KafkaConfig,
}
}
- def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config],
validateOnly: Boolean): Map[ConfigResource, ApiError] = {
+ def alterConfigs(configs: Map[ConfigResource, AlterConfigsRequest.Config],
validateOnly: Boolean): Map[ConfigResource, ApiError] = {
configs.map { case (resource, config) =>
try {
- val nullUpdates = config.entries.asScala.filter(_.value ==
null).map(_.name)
- if (nullUpdates.nonEmpty)
- throw new InvalidRequestException(s"Null value not supported for :
${nullUpdates.mkString(",")}")
-
val configEntriesMap = config.entries.asScala.map(entry =>
(entry.name, entry.value)).toMap
val configProps = new Properties
@@ -468,29 +462,6 @@ class ZkAdminManager(val config: KafkaConfig,
resource -> ApiError.NONE
}
- private def toLoggableProps(resource: ConfigResource, configProps:
Properties): Map[String, String] = {
- configProps.asScala.map {
- case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`,
key, value))
- }
- }
-
- private def alterLogLevelConfigs(alterConfigOps: Seq[AlterConfigOp]): Unit =
{
- alterConfigOps.foreach { alterConfigOp =>
- val loggerName = alterConfigOp.configEntry().name()
- val logLevel = alterConfigOp.configEntry().value()
- alterConfigOp.opType() match {
- case OpType.SET =>
- info(s"Updating the log level of $loggerName to $logLevel")
- Log4jController.logLevel(loggerName, logLevel)
- case OpType.DELETE =>
- info(s"Unset the log level of $loggerName")
- Log4jController.unsetLogLevel(loggerName)
- case _ => throw new IllegalArgumentException(
- s"Log level cannot be changed for OpType: ${alterConfigOp.opType()}")
- }
- }
- }
-
private def getBrokerId(resource: ConfigResource) = {
if (resource.name == null || resource.name.isEmpty)
None
@@ -514,18 +485,6 @@ class ZkAdminManager(val config: KafkaConfig,
def incrementalAlterConfigs(configs: Map[ConfigResource,
Seq[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
configs.map { case (resource, alterConfigOps) =>
try {
- // throw InvalidRequestException if any duplicate keys
- val duplicateKeys = alterConfigOps.groupBy(config =>
config.configEntry.name).filter { case (_, v) =>
- v.size > 1
- }.keySet
- if (duplicateKeys.nonEmpty)
- throw new InvalidRequestException(s"Error due to duplicate config
keys : ${duplicateKeys.mkString(",")}")
- val nullUpdates = alterConfigOps
- .filter(entry => entry.configEntry.value == null && entry.opType()
!= OpType.DELETE)
- .map(entry => s"${entry.opType}:${entry.configEntry.name}")
- if (nullUpdates.nonEmpty)
- throw new InvalidRequestException(s"Null value not supported for :
${nullUpdates.mkString(",")}")
-
val configEntriesMap = alterConfigOps.map(entry =>
(entry.configEntry.name, entry.configEntry.value)).toMap
resource.`type` match {
@@ -545,13 +504,6 @@ class ZkAdminManager(val config: KafkaConfig,
prepareIncrementalConfigs(alterConfigOps, configProps,
KafkaConfig.configKeys)
alterBrokerConfigs(resource, validateOnly, configProps,
configEntriesMap)
- case ConfigResource.Type.BROKER_LOGGER =>
- getBrokerId(resource)
- validateLogLevelConfigs(alterConfigOps)
-
- if (!validateOnly)
- alterLogLevelConfigs(alterConfigOps)
- resource -> ApiError.NONE
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported
for topics and brokers, but resource type is $resourceType")
}
@@ -572,73 +524,6 @@ class ZkAdminManager(val config: KafkaConfig,
}.toMap
}
- private def validateLogLevelConfigs(alterConfigOps: Seq[AlterConfigOp]):
Unit = {
- def validateLoggerNameExists(loggerName: String): Unit = {
- if (!Log4jController.loggerExists(loggerName))
- throw new ConfigException(s"Logger $loggerName does not exist!")
- }
-
- alterConfigOps.foreach { alterConfigOp =>
- val loggerName = alterConfigOp.configEntry.name
- alterConfigOp.opType() match {
- case OpType.SET =>
- validateLoggerNameExists(loggerName)
- val logLevel = alterConfigOp.configEntry.value
- if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
- val validLevelsStr =
LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
- throw new ConfigException(
- s"Cannot set the log level of $loggerName to $logLevel as it is
not a supported log level. " +
- s"Valid log levels are $validLevelsStr"
- )
- }
- case OpType.DELETE =>
- validateLoggerNameExists(loggerName)
- if (loggerName == Log4jController.ROOT_LOGGER)
- throw new InvalidRequestException(s"Removing the log level of the
${Log4jController.ROOT_LOGGER} logger is not allowed")
- case OpType.APPEND => throw new
InvalidRequestException(s"${OpType.APPEND} operation is not allowed for the
${ConfigResource.Type.BROKER_LOGGER} resource")
- case OpType.SUBTRACT => throw new
InvalidRequestException(s"${OpType.SUBTRACT} operation is not allowed for the
${ConfigResource.Type.BROKER_LOGGER} resource")
- }
- }
- }
-
- private def prepareIncrementalConfigs(alterConfigOps: Seq[AlterConfigOp],
configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {
-
- def listType(configName: String, configKeys: Map[String, ConfigKey]):
Boolean = {
- val configKey = configKeys(configName)
- if (configKey == null)
- throw new InvalidConfigurationException(s"Unknown topic config name:
$configName")
- configKey.`type` == ConfigDef.Type.LIST
- }
-
- alterConfigOps.foreach { alterConfigOp =>
- val configPropName = alterConfigOp.configEntry.name
- alterConfigOp.opType() match {
- case OpType.SET =>
configProps.setProperty(alterConfigOp.configEntry.name,
alterConfigOp.configEntry.value)
- case OpType.DELETE =>
configProps.remove(alterConfigOp.configEntry.name)
- case OpType.APPEND => {
- if (!listType(alterConfigOp.configEntry.name, configKeys))
- throw new InvalidRequestException(s"Config value append is not
allowed for config key: ${alterConfigOp.configEntry.name}")
- val oldValueList =
Option(configProps.getProperty(alterConfigOp.configEntry.name))
-
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
ConfigDef.Type.LIST)))
- .getOrElse("")
- .split(",").toList
- val newValueList = oldValueList :::
alterConfigOp.configEntry.value.split(",").toList
- configProps.setProperty(alterConfigOp.configEntry.name,
newValueList.mkString(","))
- }
- case OpType.SUBTRACT => {
- if (!listType(alterConfigOp.configEntry.name, configKeys))
- throw new InvalidRequestException(s"Config value subtract is not
allowed for config key: ${alterConfigOp.configEntry.name}")
- val oldValueList =
Option(configProps.getProperty(alterConfigOp.configEntry.name))
-
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue,
ConfigDef.Type.LIST)))
- .getOrElse("")
- .split(",").toList
- val newValueList =
oldValueList.diff(alterConfigOp.configEntry.value.split(",").toList)
- configProps.setProperty(alterConfigOp.configEntry.name,
newValueList.mkString(","))
- }
- }
- }
- }
-
def shutdown(): Unit = {
topicPurgatory.shutdown()
CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
b/core/src/main/scala/kafka/server/ZkConfigManager.scala
similarity index 96%
rename from core/src/main/scala/kafka/server/DynamicConfigManager.scala
rename to core/src/main/scala/kafka/server/ZkConfigManager.scala
index 3eed382..c763d3f 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/ZkConfigManager.scala
@@ -59,7 +59,7 @@ object ConfigEntityName {
*
* To avoid watching all topics for changes instead we have a notification path
* /config/changes
- * The DynamicConfigManager has a child watch on this path.
+ * The ZkConfigManager has a child watch on this path.
*
* To update a config we first update the config properties. Then we create a
new sequential
* znode under the change path which contains the name of the entityType and
entityName that was updated, say
@@ -84,10 +84,12 @@ object ConfigEntityName {
* on startup where a change might be missed between the initial config load
and registering for change notifications.
*
*/
-class DynamicConfigManager(private val zkClient: KafkaZkClient,
- private val configHandlers: Map[String,
ConfigHandler],
- private val changeExpirationMs: Long = 15*60*1000,
- private val time: Time = Time.SYSTEM) extends
Logging {
+class ZkConfigManager(
+ private val zkClient: KafkaZkClient,
+ private val configHandlers: Map[String, ConfigHandler],
+ private val changeExpirationMs: Long = 15*60*1000,
+ private val time: Time = Time.SYSTEM
+) extends Logging {
val adminZkClient = new AdminZkClient(zkClient)
object ConfigChangedNotificationHandler extends NotificationHandler {
@@ -182,3 +184,4 @@ class DynamicConfigManager(private val zkClient:
KafkaZkClient,
configChangeListener.close()
}
}
+
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 702d227..55fc1f8 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -252,6 +252,9 @@ class BrokerMetadataListener(
val delta = _delta
_image = _delta.apply()
_delta = new MetadataDelta(_image)
+ if (isDebugEnabled) {
+ debug(s"Publishing new metadata delta ${delta} at offset
${_image.highestOffsetAndEpoch().offset}.")
+ }
publisher.publish(delta, _image)
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 2c56ed6..7c6d190 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -19,12 +19,12 @@ package kafka.server.metadata
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.{UnifiedLog, LogManager}
-import kafka.server.ConfigType
-import kafka.server.{ConfigEntityName, ConfigHandler, FinalizedFeatureCache,
KafkaConfig, ReplicaManager, RequestLocal}
+import kafka.log.{LogManager, UnifiedLog}
+import kafka.server.ConfigAdminManager.toLoggableProps
+import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType,
FinalizedFeatureCache, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta,
TopicsImage}
@@ -175,19 +175,31 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Apply configuration deltas.
Option(delta.configsDelta()).foreach { configsDelta =>
- configsDelta.changes().keySet().forEach { configResource =>
- val tag = configResource.`type`() match {
- case ConfigResource.Type.TOPIC => Some(ConfigType.Topic)
- case ConfigResource.Type.BROKER => Some(ConfigType.Broker)
- case _ => None
- }
- tag.foreach { t =>
- val newProperties =
newImage.configs().configProperties(configResource)
- val maybeDefaultName = configResource.name() match {
- case "" => ConfigEntityName.Default
- case k => k
+ configsDelta.changes().keySet().forEach { resource =>
+ val props = newImage.configs().configProperties(resource)
+ resource.`type`() match {
+ case TOPIC =>
+ // Apply changes to a topic's dynamic configuration.
+ info(s"Updating topic ${resource.name()} with new configuration
: " +
+ toLoggableProps(resource, props).mkString(","))
+ dynamicConfigHandlers(ConfigType.Topic).
+ processConfigChanges(resource.name(), props)
+ conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
+ case BROKER => if (resource.name().isEmpty) {
+ // Apply changes to "cluster configs" (also known as default
BROKER configs).
+ // These are stored in KRaft with an empty name field.
+ info(s"Updating cluster configuration : " +
+ toLoggableProps(resource, props).mkString(","))
+ dynamicConfigHandlers(ConfigType.Broker).
+ processConfigChanges(ConfigEntityName.Default, props)
+ } else if (resource.name().equals(brokerId.toString)) {
+ // Apply changes to this broker's dynamic configuration.
+ info(s"Updating broker ${brokerId} with new configuration : " +
+ toLoggableProps(resource, props).mkString(","))
+ dynamicConfigHandlers(ConfigType.Broker).
+ processConfigChanges(resource.name(), props)
}
- dynamicConfigHandlers(t).processConfigChanges(maybeDefaultName,
newProperties)
+ case _ => // nothing to do
}
}
}
@@ -258,6 +270,11 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// recovery-from-unclean-shutdown if required.
logManager.startup(metadataCache.getAllTopics())
+ // Make the LogCleaner available for reconfiguration. We can't do this
prior to this
+ // point because LogManager#startup creates the LogCleaner object, if
+ // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
+
Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
+
// Start the replica manager.
replicaManager.startup()
diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
index 95fe752..8f8dfcd 100644
--- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
@@ -19,7 +19,7 @@ package kafka.server.metadata
import java.util.Properties
-import kafka.server.ConfigType
+import kafka.server.{ConfigEntityName, ConfigType}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
@@ -37,6 +37,13 @@ class ZkConfigRepository(adminZkClient: AdminZkClient)
extends ConfigRepository
case Type.BROKER => ConfigType.Broker
case tpe => throw new IllegalArgumentException(s"Unsupported config
type: $tpe")
}
- adminZkClient.fetchEntityConfig(configTypeForZk, configResource.name)
+ // ZK stores cluster configs under "<default>".
+ val effectiveName = if (configResource.`type`.equals(Type.BROKER) &&
+ configResource.name.isEmpty()) {
+ ConfigEntityName.Default
+ } else {
+ configResource.name
+ }
+ adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index bd66387..1dd4096 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -26,19 +26,20 @@ import java.time.Duration
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent._
+
import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
-import kafka.log.LogConfig
+import kafka.log.{CleanerConfig, LogConfig}
import kafka.message.ProducerCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.network.{Processor, RequestChannel}
import kafka.server.QuorumTestHarness
import kafka.utils._
import kafka.utils.Implicits._
-import kafka.zk.{ConfigEntityChangeNotificationZNode}
+import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
@@ -485,12 +486,15 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
props.put(KafkaConfig.MessageMaxBytesProp, "40000")
props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
- reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogCleanerThreadsProp, "2"))
// Verify cleaner config was updated. Wait for one of the configs to be
updated and verify
// that all other others were updated at the same time since they are
reconfigured together
- val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
- TestUtils.waitUntilTrue(() => newCleanerConfig.numThreads == 2, "Log
cleaner not reconfigured")
+ var newCleanerConfig: CleanerConfig = null
+ TestUtils.waitUntilTrue(() => {
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogCleanerThreadsProp, "2"))
+ newCleanerConfig = servers.head.logManager.cleaner.currentConfig
+ newCleanerConfig.numThreads == 2
+ }, "Log cleaner not reconfigured", 60000)
assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
assertEquals(300000, newCleanerConfig.ioBufferSize)
@@ -1426,7 +1430,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
Seq(new ConfigResource(ConfigResource.Type.BROKER, ""))
brokerResources.foreach { brokerResource =>
val exception = assertThrows(classOf[ExecutionException], () =>
alterResult.values.get(brokerResource).get)
- assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
+ assertEquals(classOf[InvalidRequestException],
exception.getCause().getClass())
}
servers.foreach { server =>
assertEquals(oldProps, server.config.values.asScala.filter { case (k,
_) => newProps.containsKey(k) })
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 25e79ad..c62dbd5 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -21,18 +21,26 @@ import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment,
NewTopic}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config,
ConfigEntry, NewPartitionReassignment, NewTopic}
import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.requests.{DescribeClusterRequest,
DescribeClusterResponse}
+import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest,
DescribeClusterResponse}
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Tag, Test, Timeout}
-
import java.util
+import java.util.concurrent.ExecutionException
import java.util.{Arrays, Collections, Optional}
+
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.ConfigResource.Type
+import org.apache.kafka.common.protocol.Errors._
+import org.slf4j.LoggerFactory
+
+import scala.annotation.nowarn
import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@@ -40,6 +48,8 @@ import scala.jdk.CollectionConverters._
@Timeout(120)
@Tag("integration")
class KRaftClusterTest {
+ val log = LoggerFactory.getLogger(classOf[KRaftClusterTest])
+ val log2 =
LoggerFactory.getLogger(classOf[KRaftClusterTest].getCanonicalName() + "2")
@Test
def testCreateClusterAndClose(): Unit = {
@@ -459,4 +469,248 @@ class KRaftClusterTest {
topicsNotFound.isEmpty && extraTopics.isEmpty
}, s"Failed to find topic(s): ${topicsNotFound.asScala} and NOT find
topic(s): ${extraTopics}")
}
+
+ private def incrementalAlter(
+ admin: Admin,
+ changes: Seq[(ConfigResource, Seq[AlterConfigOp])]
+ ): Seq[ApiError] = {
+ val configs = new util.HashMap[ConfigResource,
util.Collection[AlterConfigOp]]()
+ changes.foreach {
+ case (resource, ops) => configs.put(resource, ops.asJava)
+ }
+ val values = admin.incrementalAlterConfigs(configs).values()
+ changes.map {
+ case (resource, _) => try {
+ values.get(resource).get()
+ ApiError.NONE
+ } catch {
+ case e: ExecutionException => ApiError.fromThrowable(e.getCause)
+ case t: Throwable => ApiError.fromThrowable(t)
+ }
+ }
+ }
+
+ private def validateConfigs(
+ admin: Admin,
+ expected: Map[ConfigResource, Seq[(String, String)]],
+ exhaustive: Boolean = false
+ ): Map[ConfigResource, util.Map[String, String]] = {
+ val results = new mutable.HashMap[ConfigResource, util.Map[String,
String]]()
+ TestUtils.retry(60000) {
+ try {
+ val values = admin.describeConfigs(expected.keySet.asJava).values()
+ results.clear()
+ assertEquals(expected.keySet, values.keySet().asScala)
+ expected.foreach {
+ case (resource, pairs) =>
+ val config = values.get(resource).get()
+ val actual = new util.TreeMap[String, String]()
+ val expected = new util.TreeMap[String, String]()
+ config.entries().forEach {
+ case entry =>
+ actual.put(entry.name(), entry.value())
+ if (!exhaustive) {
+ expected.put(entry.name(), entry.value())
+ }
+ }
+ pairs.foreach {
+ case (k, v) => expected.put(k, v)
+ }
+ assertEquals(expected, actual)
+ results.put(resource, actual)
+ }
+ } catch {
+ case t: Throwable =>
+ log.warn(s"Unable to describeConfigs(${expected.keySet.asJava})", t)
+ throw t
+ }
+ }
+ results.toMap
+ }
+
+ @Test
+ def testIncrementalAlterConfigs(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(4).
+ setNumControllerNodes(3).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
+ (new ConfigResource(Type.BROKER, ""), Seq(
+ new AlterConfigOp(new ConfigEntry("log.roll.ms", "1234567"),
OpType.SET),
+ new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "6"),
OpType.SET))))))
+ validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
+ ("log.roll.ms", "1234567"),
+ ("max.connections.per.ip", "6"))), true)
+
+ admin.createTopics(Arrays.asList(
+ new NewTopic("foo", 2, 3.toShort),
+ new NewTopic("bar", 2, 3.toShort))).all().get()
+
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
"foo", 2)
+
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
"bar", 2)
+
+ validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "bar") ->
Seq()))
+
+ assertEquals(Seq(ApiError.NONE,
+ new ApiError(INVALID_CONFIG, "Unknown topic config name:
not.a.real.topic.config"),
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "The topic 'baz' does not
exist.")),
+ incrementalAlter(admin, Seq(
+ (new ConfigResource(Type.TOPIC, "foo"), Seq(
+ new AlterConfigOp(new ConfigEntry("segment.jitter.ms", "345"),
OpType.SET))),
+ (new ConfigResource(Type.TOPIC, "bar"), Seq(
+ new AlterConfigOp(new ConfigEntry("not.a.real.topic.config",
"789"), OpType.SET))),
+ (new ConfigResource(Type.TOPIC, "baz"), Seq(
+ new AlterConfigOp(new ConfigEntry("segment.jitter.ms", "678"),
OpType.SET))))))
+
+ validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "foo") ->
Seq(
+ ("segment.jitter.ms", "345"))))
+
+ assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
+ (new ConfigResource(Type.BROKER, "2"), Seq(
+ new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "7"),
OpType.SET))))))
+
+ validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "2") -> Seq(
+ ("max.connections.per.ip", "7"))))
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
+
+ @Test
+ def testSetLog4jConfigurations(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(4).
+ setNumControllerNodes(3).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ Seq(log, log2).foreach(_.debug("setting log4j"))
+
+ val broker2 = new ConfigResource(Type.BROKER_LOGGER, "2")
+ val broker3 = new ConfigResource(Type.BROKER_LOGGER, "3")
+ val initialLog4j = validateConfigs(admin, Map(broker2 -> Seq()))
+
+ assertEquals(Seq(ApiError.NONE,
+ new ApiError(INVALID_REQUEST, "APPEND operation is not allowed for
the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Seq(
+ (broker2, Seq(
+ new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"),
OpType.SET),
+ new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"),
OpType.SET))),
+ (broker3, Seq(
+ new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"),
OpType.APPEND),
+ new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"),
OpType.APPEND))))))
+
+ validateConfigs(admin, Map(broker2 -> Seq(
+ (log.getName(), "TRACE"),
+ (log2.getName(), "TRACE"))))
+
+ assertEquals(Seq(ApiError.NONE,
+ new ApiError(INVALID_REQUEST, "SUBTRACT operation is not allowed for
the BROKER_LOGGER resource")),
+ incrementalAlter(admin, Seq(
+ (broker2, Seq(
+ new AlterConfigOp(new ConfigEntry(log.getName(), ""),
OpType.DELETE),
+ new AlterConfigOp(new ConfigEntry(log2.getName(), ""),
OpType.DELETE))),
+ (broker3, Seq(
+ new AlterConfigOp(new ConfigEntry(log.getName(), "TRACE"),
OpType.SUBTRACT),
+ new AlterConfigOp(new ConfigEntry(log2.getName(), "TRACE"),
OpType.SUBTRACT))))))
+
+ validateConfigs(admin, Map(broker2 -> Seq(
+ (log.getName(), initialLog4j.get(broker2).get.get(log.getName())),
+ (log2.getName(),
initialLog4j.get(broker2).get.get(log2.getName())))))
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
+
+ @nowarn("cat=deprecation") // Suppress warnings about using legacy
alterConfigs
+ def legacyAlter(
+ admin: Admin,
+ resources: Map[ConfigResource, Seq[ConfigEntry]]
+ ): Seq[ApiError] = {
+ val configs = new util.HashMap[ConfigResource, Config]()
+ resources.foreach {
+ case (resource, entries) => configs.put(resource, new
Config(entries.asJava))
+ }
+ val values = admin.alterConfigs(configs).values()
+ resources.map {
+ case (resource, _) => try {
+ values.get(resource).get()
+ ApiError.NONE
+ } catch {
+ case e: ExecutionException => ApiError.fromThrowable(e.getCause)
+ case t: Throwable => ApiError.fromThrowable(t)
+ }
+ }.toSeq
+ }
+
+ @Test
+ def testLegacyAlterConfigs(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(4).
+ setNumControllerNodes(3).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ val defaultBroker = new ConfigResource(Type.BROKER, "")
+
+ assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker
-> Seq(
+ new ConfigEntry("log.roll.ms", "1234567"),
+ new ConfigEntry("max.connections.per.ip", "6")))))
+
+ validateConfigs(admin, Map(defaultBroker -> Seq(
+ ("log.roll.ms", "1234567"),
+ ("max.connections.per.ip", "6"))), true)
+
+ assertEquals(Seq(ApiError.NONE), legacyAlter(admin, Map(defaultBroker
-> Seq(
+ new ConfigEntry("log.roll.ms", "1234567")))))
+
+ // Since max.connections.per.ip was left out of the previous
legacyAlter, it is removed.
+ validateConfigs(admin, Map(defaultBroker -> Seq(
+ ("log.roll.ms", "1234567"))), true)
+
+ admin.createTopics(Arrays.asList(
+ new NewTopic("foo", 2, 3.toShort),
+ new NewTopic("bar", 2, 3.toShort))).all().get()
+
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
"foo", 2)
+
TestUtils.waitForAllPartitionsMetadata(cluster.brokers().values().asScala.toSeq,
"bar", 2)
+ assertEquals(Seq(ApiError.NONE,
+ new ApiError(INVALID_CONFIG, "Unknown topic config name:
not.a.real.topic.config"),
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "The topic 'baz' does not
exist.")),
+ legacyAlter(admin, Map(
+ new ConfigResource(Type.TOPIC, "foo") -> Seq(
+ new ConfigEntry("segment.jitter.ms", "345")),
+ new ConfigResource(Type.TOPIC, "bar") -> Seq(
+ new ConfigEntry("not.a.real.topic.config", "789")),
+ new ConfigResource(Type.TOPIC, "baz") -> Seq(
+ new ConfigEntry("segment.jitter.ms", "678")))))
+
+ validateConfigs(admin, Map(new ConfigResource(Type.TOPIC, "foo") ->
Seq(
+ ("segment.jitter.ms", "345"))))
+
+ } finally {
+ admin.close()
+ }
+ } finally {
+ cluster.close()
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
new file mode 100644
index 0000000..19390c3
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+import java.util.Collections
+
+import kafka.server.metadata.MockConfigRepository
+import kafka.utils.{Log4jController, TestUtils}
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
BROKER_LOGGER, TOPIC, UNKNOWN}
+import org.apache.kafka.common.config.LogLevelConfig.VALID_LOG_LEVELS
+import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException}
+import org.apache.kafka.common.message.{AlterConfigsRequestData,
AlterConfigsResponseData, IncrementalAlterConfigsRequestData,
IncrementalAlterConfigsResponseData}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource
=> LAlterConfigsResource}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection
=> LAlterConfigsResourceCollection}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection
=> LAlterableConfigCollection}
+import
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
=> LAlterConfigsResourceResponse}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig =>
LAlterableConfig}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
=> IAlterConfigsResource}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection
=> IAlterConfigsResourceCollection}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig
=> IAlterableConfig}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection
=> IAlterableConfigCollection}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse
=> IAlterConfigsResourceResponse}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE}
+import org.apache.kafka.common.requests.ApiError
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.{Assertions, Test}
+import org.slf4j.LoggerFactory
+
+import scala.jdk.CollectionConverters._
+
+class ConfigAdminManagerTest {
+ val logger = LoggerFactory.getLogger(classOf[ConfigAdminManagerTest])
+
+ def newConfigAdminManager(brokerId: Integer): ConfigAdminManager = {
+ val config = TestUtils.createBrokerConfig(nodeId = brokerId, zkConnect =
null)
+ new ConfigAdminManager(brokerId, new KafkaConfig(config), new
MockConfigRepository())
+ }
+
+ def broker0Incremental(): IAlterConfigsResource = new
IAlterConfigsResource().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setConfigs(new IAlterableConfigCollection(
+ util.Arrays.asList(new IAlterableConfig().setName("foo").
+ setValue("bar").
+ setConfigOperation(OpType.SET.id())).iterator()))
+
+ def topicAIncremental(): IAlterConfigsResource = new IAlterConfigsResource().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setConfigs(new IAlterableConfigCollection(
+ util.Arrays.asList(new IAlterableConfig().setName("foo").
+ setValue("bar").
+ setConfigOperation(OpType.SET.id())).iterator()))
+
+ def broker0Legacy(): LAlterConfigsResource = new LAlterConfigsResource().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setConfigs(new LAlterableConfigCollection(
+ util.Arrays.asList(new LAlterableConfig().setName("foo").
+ setValue("bar")).iterator()))
+
+ def topicALegacy(): LAlterConfigsResource = new LAlterConfigsResource().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setConfigs(new LAlterableConfigCollection(
+ util.Arrays.asList(new LAlterableConfig().setName("foo").
+ setValue("bar")).iterator()))
+
+ val invalidRequestError = new ApiError(INVALID_REQUEST)
+
+ @Test
+ def testCopyWithoutPreprocessedForIncremental(): Unit = {
+ val broker0 = broker0Incremental()
+ val topicA = topicAIncremental()
+ val request = new
IncrementalAlterConfigsRequestData().setValidateOnly(true).
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ broker0, topicA).iterator()))
+ val processed1 = new util.IdentityHashMap[IAlterConfigsResource,
ApiError]()
+ processed1.put(broker0, ApiError.NONE)
+ assertEquals(new
IncrementalAlterConfigsRequestData().setValidateOnly(true).
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ topicA.duplicate()).iterator())),
+ ConfigAdminManager.copyWithoutPreprocessed(request, processed1))
+ val processed2 = new util.IdentityHashMap[IAlterConfigsResource,
ApiError]()
+ assertEquals(new
IncrementalAlterConfigsRequestData().setValidateOnly(true).
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ broker0.duplicate(), topicA.duplicate()).iterator())),
+ ConfigAdminManager.copyWithoutPreprocessed(request, processed2))
+ val processed3 = new util.IdentityHashMap[IAlterConfigsResource,
ApiError]()
+ processed3.put(broker0, ApiError.NONE)
+ processed3.put(topicA, ApiError.NONE)
+ assertEquals(new
IncrementalAlterConfigsRequestData().setValidateOnly(true),
+ ConfigAdminManager.copyWithoutPreprocessed(request, processed3))
+ }
+
+ @Test
+ def testCopyWithoutPreprocessedForLegacy(): Unit = {
+ val broker0 = broker0Legacy()
+ val topicA = topicALegacy()
+ val request = new AlterConfigsRequestData().setValidateOnly(true).
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ broker0, topicA).iterator()))
+ val processed1 = new util.IdentityHashMap[LAlterConfigsResource,
ApiError]()
+ processed1.put(broker0, ApiError.NONE)
+ assertEquals(new AlterConfigsRequestData().setValidateOnly(true).
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ topicA.duplicate()).iterator())),
+ ConfigAdminManager.copyWithoutPreprocessed(request, processed1))
+ val processed2 = new util.IdentityHashMap[LAlterConfigsResource,
ApiError]()
+ assertEquals(new AlterConfigsRequestData().setValidateOnly(true).
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ broker0.duplicate(), topicA.duplicate()).iterator())),
+ ConfigAdminManager.copyWithoutPreprocessed(request, processed2))
+ val processed3 = new util.IdentityHashMap[LAlterConfigsResource,
ApiError]()
+ processed3.put(broker0, ApiError.NONE)
+ processed3.put(topicA, ApiError.NONE)
+ assertEquals(new AlterConfigsRequestData().setValidateOnly(true),
+ ConfigAdminManager.copyWithoutPreprocessed(request, processed3))
+ }
+
+ @Test
+ def testReassembleIncrementalResponse(): Unit = {
+ val broker0 = broker0Incremental()
+ val topicA = topicAIncremental()
+ val original = new IncrementalAlterConfigsRequestData().
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ broker0, topicA).iterator()))
+ val preprocessed1 = new util.IdentityHashMap[IAlterConfigsResource,
ApiError]()
+ preprocessed1.put(broker0, invalidRequestError)
+ val persistentResponses1 = new
IncrementalAlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new IAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null)))
+ assertEquals(new IncrementalAlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new IAlterConfigsResourceResponse().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage(INVALID_REQUEST.message()),
+ new IAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null))),
+ ConfigAdminManager.reassembleIncrementalResponse(original,
preprocessed1, persistentResponses1))
+ val preprocessed2 = new util.IdentityHashMap[IAlterConfigsResource,
ApiError]()
+ val persistentResponses2 = new
IncrementalAlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new IAlterConfigsResourceResponse().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ new IAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ ))
+ assertEquals(new IncrementalAlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new IAlterConfigsResourceResponse().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ new IAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null))),
+ ConfigAdminManager.reassembleIncrementalResponse(original,
preprocessed2, persistentResponses2))
+ }
+
+ @Test
+ def testReassembleLegacyResponse(): Unit = {
+ val broker0 = broker0Legacy()
+ val topicA = topicALegacy()
+ val original = new AlterConfigsRequestData().
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ broker0, topicA).iterator()))
+ val preprocessed1 = new util.IdentityHashMap[LAlterConfigsResource,
ApiError]()
+ preprocessed1.put(broker0, invalidRequestError)
+ val persistentResponses1 = new AlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new LAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null)))
+ assertEquals(new AlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new LAlterConfigsResourceResponse().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setErrorCode(INVALID_REQUEST.code()).
+ setErrorMessage(INVALID_REQUEST.message()),
+ new LAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null))),
+ ConfigAdminManager.reassembleLegacyResponse(original, preprocessed1,
persistentResponses1))
+ val preprocessed2 = new util.IdentityHashMap[LAlterConfigsResource,
ApiError]()
+ val persistentResponses2 = new AlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new LAlterConfigsResourceResponse().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ new LAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ ))
+ assertEquals(new AlterConfigsResponseData().setResponses(
+ util.Arrays.asList(new LAlterConfigsResourceResponse().
+ setResourceName("0").
+ setResourceType(BROKER.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null),
+ new LAlterConfigsResourceResponse().
+ setResourceName("a").
+ setResourceType(TOPIC.id()).
+ setErrorCode(NONE.code()).
+ setErrorMessage(null))),
+ ConfigAdminManager.reassembleLegacyResponse(original, preprocessed2,
persistentResponses2))
+ }
+
+ @Test
+ def testValidateResourceNameIsCurrentNodeId(): Unit = {
+ val manager = newConfigAdminManager(5)
+ manager.validateResourceNameIsCurrentNodeId("5")
+ assertEquals("Node id must be an integer, but it is: ",
+ Assertions.assertThrows(classOf[InvalidRequestException],
+ () => manager.validateResourceNameIsCurrentNodeId("")).getMessage())
+ assertEquals("Unexpected broker id, expected 5, but received 3",
+ Assertions.assertThrows(classOf[InvalidRequestException],
+ () => manager.validateResourceNameIsCurrentNodeId("3")).getMessage())
+ assertEquals("Node id must be an integer, but it is: e",
+ Assertions.assertThrows(classOf[InvalidRequestException],
+ () => manager.validateResourceNameIsCurrentNodeId("e")).getMessage())
+ }
+
+ @Test
+ def testValidateLogLevelConfigs(): Unit = {
+ val manager = newConfigAdminManager(5)
+ manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+ setName(logger.getName).
+ setConfigOperation(OpType.SET.id()).
+ setValue("TRACE")))
+ manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+ setName(logger.getName).
+ setConfigOperation(OpType.DELETE.id()).
+ setValue("")))
+ assertEquals("APPEND operation is not allowed for the BROKER_LOGGER
resource",
+ Assertions.assertThrows(classOf[InvalidRequestException],
+ () => manager.validateLogLevelConfigs(util.Arrays.asList(new
IAlterableConfig().
+ setName(logger.getName).
+ setConfigOperation(OpType.APPEND.id()).
+ setValue("TRACE")))).getMessage())
+ assertEquals(s"Cannot set the log level of ${logger.getName} to BOGUS as
it is not " +
+ s"a supported log level. Valid log levels are
${VALID_LOG_LEVELS.asScala.mkString(", ")}",
+ Assertions.assertThrows(classOf[InvalidConfigurationException],
+ () => manager.validateLogLevelConfigs(util.Arrays.asList(new
IAlterableConfig().
+ setName(logger.getName).
+ setConfigOperation(OpType.SET.id()).
+ setValue("BOGUS")))).getMessage())
+ }
+
+ @Test
+ def testValidateRootLogLevelConfigs(): Unit = {
+ val manager = newConfigAdminManager(5)
+ manager.validateLogLevelConfigs(util.Arrays.asList(new IAlterableConfig().
+ setName(Log4jController.ROOT_LOGGER).
+ setConfigOperation(OpType.SET.id()).
+ setValue("TRACE")))
+ assertEquals(s"Removing the log level of the
${Log4jController.ROOT_LOGGER} logger is not allowed",
+ Assertions.assertThrows(classOf[InvalidRequestException],
+ () => manager.validateLogLevelConfigs(util.Arrays.asList(new
IAlterableConfig().
+ setName(Log4jController.ROOT_LOGGER).
+ setConfigOperation(OpType.DELETE.id()).
+ setValue("")))).getMessage())
+ }
+
+ def brokerLogger1Incremental(): IAlterConfigsResource = new
IAlterConfigsResource().
+ setResourceName("1").
+ setResourceType(BROKER_LOGGER.id).
+ setConfigs(new IAlterableConfigCollection(
+ util.Arrays.asList(new IAlterableConfig().setName(logger.getName).
+ setValue("INFO").
+ setConfigOperation(OpType.SET.id())).iterator()))
+
+ def brokerLogger2Incremental(): IAlterConfigsResource = new
IAlterConfigsResource().
+ setResourceName("2").
+ setResourceType(BROKER_LOGGER.id).
+ setConfigs(new IAlterableConfigCollection(
+ util.Arrays.asList(new IAlterableConfig().setName(logger.getName).
+ setValue(null).
+ setConfigOperation(OpType.SET.id())).iterator()))
+
+ @Test
+ def testPreprocessIncrementalWithUnauthorizedBrokerLoggerChanges(): Unit = {
+ val manager = newConfigAdminManager(1)
+ val brokerLogger1 = brokerLogger1Incremental()
+ assertEquals(Collections.singletonMap(brokerLogger1,
+ new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)),
+ manager.preprocess(new IncrementalAlterConfigsRequestData().
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ brokerLogger1).iterator())),
+ (_, _) => false))
+ }
+
+ @Test
+ def testPreprocessIncrementalWithNulls(): Unit = {
+ val manager = newConfigAdminManager(2)
+ val brokerLogger2 = brokerLogger2Incremental()
+ assertEquals(Collections.singletonMap(brokerLogger2,
+ new ApiError(INVALID_REQUEST, s"Null value not supported for :
${logger.getName}")),
+ manager.preprocess(new IncrementalAlterConfigsRequestData().
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ brokerLogger2).iterator())),
+ (_, _) => true))
+ }
+
+ @Test
+ def testPreprocessIncrementalWithLoggerChanges(): Unit = {
+ val manager = newConfigAdminManager(1)
+ val brokerLogger1 = brokerLogger1Incremental()
+ assertEquals(Collections.singletonMap(brokerLogger1,
+ new ApiError(Errors.NONE, null)),
+ manager.preprocess(new IncrementalAlterConfigsRequestData().
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ brokerLogger1).iterator())),
+ (_, _) => true))
+ }
+
+ @Test
+ def testPreprocessIncrementalWithDuplicates(): Unit = {
+ val manager = newConfigAdminManager(1)
+ val brokerLogger1a = brokerLogger1Incremental()
+ val brokerLogger1b = brokerLogger1Incremental()
+ val output = manager.preprocess(new IncrementalAlterConfigsRequestData().
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ brokerLogger1a, brokerLogger1b).iterator())),
+ (_, _) => true)
+ assertEquals(2, output.size())
+ Seq(brokerLogger1a, brokerLogger1b).foreach(r =>
+ assertEquals(new ApiError(INVALID_REQUEST, "Each resource must appear at
most once."),
+ output.get(r)))
+ }
+
+ def brokerLogger1Legacy(): LAlterConfigsResource = new
LAlterConfigsResource().
+ setResourceName("1").
+ setResourceType(BROKER_LOGGER.id).
+ setConfigs(new LAlterableConfigCollection(
+ util.Arrays.asList(new LAlterableConfig().setName(logger.getName).
+ setValue("INFO")).iterator()))
+
+ def broker2Legacy(): LAlterConfigsResource = new LAlterConfigsResource().
+ setResourceName("2").
+ setResourceType(BROKER.id).
+ setConfigs(new LAlterableConfigCollection(
+ util.Arrays.asList(new LAlterableConfig().setName(logger.getName).
+ setValue(null)).iterator()))
+
+ @Test
+ def testPreprocessLegacyWithBrokerLoggerChanges(): Unit = {
+ val manager = newConfigAdminManager(1)
+ val brokerLogger1 = brokerLogger1Legacy()
+ assertEquals(Collections.singletonMap(brokerLogger1,
+ new ApiError(INVALID_REQUEST, "Unknown resource type 8")),
+ manager.preprocess(new AlterConfigsRequestData().
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ brokerLogger1).iterator()))))
+ }
+
+ @Test
+ def testPreprocessLegacyWithNulls(): Unit = {
+ val manager = newConfigAdminManager(2)
+ val brokerLogger2 = broker2Legacy()
+ assertEquals(Collections.singletonMap(brokerLogger2,
+ new ApiError(INVALID_REQUEST, s"Null value not supported for :
${logger.getName}")),
+ manager.preprocess(new AlterConfigsRequestData().
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ brokerLogger2).iterator()))))
+ }
+
+ @Test
+ def testPreprocessLegacyWithDuplicates(): Unit = {
+ val manager = newConfigAdminManager(1)
+ val brokerLogger1a = brokerLogger1Legacy()
+ val brokerLogger1b = brokerLogger1Legacy()
+ val output = manager.preprocess(new AlterConfigsRequestData().
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ brokerLogger1a, brokerLogger1b).iterator())))
+ assertEquals(2, output.size())
+ Seq(brokerLogger1a, brokerLogger1b).foreach(r =>
+ assertEquals(new ApiError(INVALID_REQUEST, "Each resource must appear at
most once."),
+ output.get(r)))
+ }
+
+ def unknownIncremental(): IAlterConfigsResource = new
IAlterConfigsResource().
+ setResourceName("unknown").
+ setResourceType(UNKNOWN.id).
+ setConfigs(new IAlterableConfigCollection(
+ util.Arrays.asList(new IAlterableConfig().setName("foo").
+ setValue("bar").
+ setConfigOperation(OpType.SET.id())).iterator()))
+
+ def unknownLegacy(): LAlterConfigsResource = new LAlterConfigsResource().
+ setResourceName("unknown").
+ setResourceType(UNKNOWN.id).
+ setConfigs(new LAlterableConfigCollection(
+ util.Arrays.asList(new LAlterableConfig().setName("foo").
+ setValue("bar")).iterator()))
+
+ @Test
+ def testPreprocessIncrementalWithUnknownResource(): Unit = {
+ val manager = newConfigAdminManager(1)
+ val unknown = unknownIncremental()
+ assertEquals(Collections.singletonMap(unknown,
+ new ApiError(INVALID_REQUEST, "Unknown resource type 0")),
+ manager.preprocess(new IncrementalAlterConfigsRequestData().
+ setResources(new IAlterConfigsResourceCollection(util.Arrays.asList(
+ unknown).iterator())),
+ (_, _) => false))
+ }
+
+ @Test
+ def testPreprocessLegacyWithUnknownResource(): Unit = {
+ val manager = newConfigAdminManager(1)
+ val unknown = unknownLegacy()
+ assertEquals(Collections.singletonMap(unknown,
+ new ApiError(INVALID_REQUEST, "Unknown resource type 0")),
+ manager.preprocess(new AlterConfigsRequestData().
+ setResources(new LAlterConfigsResourceCollection(util.Arrays.asList(
+ unknown).iterator()))))
+ }
+
+ @Test
+ def testContainsDuplicates(): Unit = {
+ assertFalse(ConfigAdminManager.containsDuplicates(Seq()))
+ assertFalse(ConfigAdminManager.containsDuplicates(Seq("foo")))
+ assertTrue(ConfigAdminManager.containsDuplicates(Seq("foo", "foo")))
+ assertFalse(ConfigAdminManager.containsDuplicates(Seq("foo", "bar",
"baz")))
+ assertTrue(ConfigAdminManager.containsDuplicates(Seq("foo", "bar", "baz",
"foo")))
+ }
+}
\ No newline at end of file
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 8565385..02328ee 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -412,7 +412,7 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
EasyMock.expectLastCall().once()
EasyMock.replay(handler)
- val configManager = new DynamicConfigManager(zkClient,
Map(ConfigType.Topic -> handler))
+ val configManager = new ZkConfigManager(zkClient, Map(ConfigType.Topic ->
handler))
// Notifications created using the old TopicConfigManager are ignored.
configManager.ConfigChangedNotificationHandler.processNotification("not
json".getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 794ecea..e617eab 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -34,7 +34,7 @@ import kafka.log.AppendOrigin
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache,
MockConfigRepository, ZkMetadataCache}
-import kafka.utils.{MockTime, TestUtils}
+import kafka.utils.{Log4jController, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
@@ -43,9 +43,20 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
BROKER_LOGGER}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection
=> LAlterConfigsResourceCollection}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource
=> LAlterConfigsResource}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection
=> LAlterableConfigCollection}
+import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig =>
LAlterableConfig}
+import
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
=> LAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicCollection}
import
org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
=> IAlterConfigsResource}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection
=> IAlterConfigsResourceCollection}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig
=> IAlterableConfig}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection
=> IAlterableConfigCollection}
+import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse
=> IAlterConfigsResourceResponse}
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition,
ListOffsetsTopic}
@@ -84,7 +95,6 @@ import scala.jdk.CollectionConverters._
import java.util.Arrays
class KafkaApisTest {
-
private val requestChannel: RequestChannel =
EasyMock.createNiceMock(classOf[RequestChannel])
private val requestChannelMetrics: RequestChannel.Metrics =
EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
private val replicaManager: ReplicaManager =
EasyMock.createNiceMock(classOf[ReplicaManager])
@@ -466,12 +476,6 @@ class KafkaApisTest {
}
@Test
- def testAlterConfigsWithForwarding(): Unit = {
- val requestBuilder = new
AlterConfigsRequest.Builder(Collections.emptyMap(), false)
- testForwardableApi(ApiKeys.ALTER_CONFIGS, requestBuilder)
- }
-
- @Test
def testElectLeadersForwarding(): Unit = {
val requestBuilder = new
ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000)
testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
@@ -643,13 +647,6 @@ class KafkaApisTest {
verify(authorizer, adminManager)
}
- @Test
- def testIncrementalAlterConfigsWithForwarding(): Unit = {
- val requestBuilder = new IncrementalAlterConfigsRequest.Builder(
- new IncrementalAlterConfigsRequestData())
- testForwardableApi(ApiKeys.INCREMENTAL_ALTER_CONFIGS, requestBuilder)
- }
-
private def getIncrementalAlterConfigRequestBuilder(configResources:
Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {
val resourceMap = configResources.map(configResource => {
configResource -> Set(
@@ -4154,9 +4151,38 @@ class KafkaApisTest {
}
@Test
- def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
+ def testEmptyLegacyAlterConfigsRequestWithKRaft(): Unit = {
+ val request = buildRequest(new AlterConfigsRequest(new
AlterConfigsRequestData(), 1.toShort));
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport =
true).handleAlterConfigsRequest)
+ val capturedResponse = expectNoThrottling(request)
+ EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
+ assertEquals(new AlterConfigsResponseData(),
+ capturedResponse.getValue.asInstanceOf[AlterConfigsResponse].data())
+ }
+
+ @Test
+ def testInvalidLegacyAlterConfigsRequestWithKRaft(): Unit = {
+ val request = buildRequest(new AlterConfigsRequest(new
AlterConfigsRequestData().
+ setValidateOnly(true).
+ setResources(new LAlterConfigsResourceCollection(Arrays.asList(
+ new LAlterConfigsResource().
+ setResourceName(brokerId.toString).
+ setResourceType(BROKER.id()).
+ setConfigs(new LAlterableConfigCollection(Arrays.asList(new
LAlterableConfig().
+ setName("foo").
+ setValue(null)).iterator()))).iterator())), 1.toShort))
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ val capturedResponse = expectNoThrottling(request)
+ EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ createKafkaApis(raftSupport = true).handleAlterConfigsRequest(request)
+ assertEquals(new AlterConfigsResponseData().setResponses(Arrays.asList(
+ new LAlterConfigsResourceResponse().
+ setErrorCode(Errors.INVALID_REQUEST.code()).
+ setErrorMessage("Null value not supported for : foo").
+ setResourceName(brokerId.toString).
+ setResourceType(BROKER.id()))),
+ capturedResponse.getValue.asInstanceOf[AlterConfigsResponse].data())
}
@Test
@@ -4166,9 +4192,38 @@ class KafkaApisTest {
}
@Test
- def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
+ def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
+ val request = buildRequest(new IncrementalAlterConfigsRequest(new
IncrementalAlterConfigsRequestData(), 1.toShort));
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ val capturedResponse = expectNoThrottling(request)
+ EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ createKafkaApis(raftSupport =
true).handleIncrementalAlterConfigsRequest(request)
+ assertEquals(new IncrementalAlterConfigsResponseData(),
+
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse].data())
+ }
+
+ @Test
+ def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
+ val request = buildRequest(new IncrementalAlterConfigsRequest(new
IncrementalAlterConfigsRequestData().
+ setValidateOnly(true).
+ setResources(new IAlterConfigsResourceCollection(Arrays.asList(new
IAlterConfigsResource().
+ setResourceName(brokerId.toString).
+ setResourceType(BROKER_LOGGER.id()).
+ setConfigs(new IAlterableConfigCollection(Arrays.asList(new
IAlterableConfig().
+ setName(Log4jController.ROOT_LOGGER).
+ setValue("TRACE")).iterator()))).iterator())),
+ 1.toShort))
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport =
true).handleIncrementalAlterConfigsRequest)
+ val capturedResponse = expectNoThrottling(request)
+ EasyMock.replay(clientRequestQuotaManager, requestChannel)
+ createKafkaApis(raftSupport =
true).handleIncrementalAlterConfigsRequest(request)
+ assertEquals(new
IncrementalAlterConfigsResponseData().setResponses(Arrays.asList(
+ new IAlterConfigsResourceResponse().
+ setErrorCode(0.toShort).
+ setErrorMessage(null).
+ setResourceName(brokerId.toString).
+ setResourceType(BROKER_LOGGER.id()))),
+
capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse].data())
}
@Test
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 37df567..3a7f3c4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -267,11 +268,16 @@ public final class QuorumController implements Controller
{
case BROKER_LOGGER:
break;
case BROKER:
+ // Cluster configs are always allowed.
+ if (configResource.name().isEmpty()) break;
+
+ // Otherwise, check that the broker ID is valid.
int brokerId;
try {
brokerId = Integer.parseInt(configResource.name());
} catch (NumberFormatException e) {
- brokerId = -1;
+ throw new InvalidRequestException("Invalid broker name
" +
+ configResource.name());
}
if
(!clusterControl.brokerRegistrations().containsKey(brokerId)) {
throw new BrokerIdNotRegisteredException("No broker
with id " +
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 9d2cf77..84585f3 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -35,6 +35,8 @@ import java.util.stream.StreamSupport;
import java.util.stream.IntStream;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
@@ -69,6 +71,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
+import
org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.MetadataRecordSerde;
@@ -86,6 +89,8 @@ import org.junit.jupiter.api.Timeout;
import static java.util.concurrent.TimeUnit.HOURS;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static
org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
import static
org.apache.kafka.controller.ConfigurationControlManagerTest.CONFIGS;
import static
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
@@ -870,4 +875,38 @@ public class QuorumControllerTest {
}
}
+ @Test
+ public void testConfigResourceExistenceChecker() throws Throwable {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3,
Optional.empty())) {
+ try (QuorumControllerTestEnv controlEnv =
+ new QuorumControllerTestEnv(logEnv, b ->
b.setConfigDefs(CONFIGS))) {
+ QuorumController active = controlEnv.activeController();
+ registerBrokers(active, 5);
+ active.createTopics(new CreateTopicsRequestData().
+ setTopics(new
CreatableTopicCollection(Collections.singleton(
+ new CreatableTopic().setName("foo").
+ setReplicationFactor((short) 3).
+ setNumPartitions(1)).iterator()))).get();
+ ConfigResourceExistenceChecker checker =
+ active.new ConfigResourceExistenceChecker();
+ // A ConfigResource with type=BROKER and name=(empty string)
represents
+ // the default broker resource. It is used to set cluster
configs.
+ checker.accept(new ConfigResource(BROKER, ""));
+
+ // Broker 3 exists, so we can set a configuration for it.
+ checker.accept(new ConfigResource(BROKER, "3"));
+
+ // Broker 10 does not exist, so this should throw an exception.
+ assertThrows(BrokerIdNotRegisteredException.class,
+ () -> checker.accept(new ConfigResource(BROKER, "10")));
+
+ // Topic foo exists, so we can set a configuration for it.
+ checker.accept(new ConfigResource(TOPIC, "foo"));
+
+ // Topic bar does not exist, so this should throw an exception.
+ assertThrows(UnknownTopicOrPartitionException.class,
+ () -> checker.accept(new ConfigResource(TOPIC, "bar")));
+ }
+ }
+ }
}