This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aaa976a3409 MINOR: Some metadata publishing fixes and refactors
(#13337)
aaa976a3409 is described below
commit aaa976a3409f61e1e65ed49742567c08a502d9e1
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Mar 9 14:52:40 2023 -0800
MINOR: Some metadata publishing fixes and refactors (#13337)
This PR refactors MetadataPublisher's interface a bit. There is now an
onControllerChange
callback. This is something that some publishers might want. A good example
is ZkMigrationClient.
Instead of two different publish functions (one for snapshots, one for log
deltas), we now have a single onMetadataUpdate function. Most publishers didn't
want to do anything different in those two cases.
The ones that do want to do something different for snapshots can always
check the manifest type.
The close function now has a default empty implementation, since most
publishers didn't need to do
anything there.
Move the SCRAM logic out of BrokerMetadataPublisher and run it on the
controller as well.
On the broker, simply use dynamicClientQuotaPublisher to handle dynamic
client quotas changes.
That is what the controller already does, and the code is exactly the same
in both cases.
Fix the logging in FutureUtils.waitWithLogging a bit. Previously, when
invoked from BrokerServer
or ControllerServer, it did not include the standard "[Controller 123] "
style prefix indicating server
name and ID. This was confusing, especially when debugging junit tests.
Reviewers: Ron Dagostino <[email protected]>, David Arthur
<[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 33 +++++++---
.../main/scala/kafka/server/ControllerServer.scala | 53 ++++++++++------
.../main/scala/kafka/server/KafkaRaftServer.scala | 5 +-
.../server/metadata/BrokerMetadataPublisher.scala | 32 ++--------
.../metadata/DynamicClientQuotaPublisher.scala | 53 +++++-----------
.../server/metadata/DynamicConfigPublisher.scala | 53 +++++-----------
.../kafka/server/metadata/ScramPublisher.scala | 71 ++++++++++++++++++++++
.../metadata/BrokerMetadataPublisherTest.scala | 13 ++--
.../apache/kafka/image/loader/LoaderManifest.java | 36 +++++++++++
.../kafka/image/loader/LoaderManifestType.java | 27 ++++++++
.../kafka/image/loader/LogDeltaManifest.java | 7 ++-
.../apache/kafka/image/loader/MetadataLoader.java | 16 ++++-
.../kafka/image/loader/SnapshotManifest.java | 8 ++-
.../kafka/image/publisher/MetadataPublisher.java | 31 +++++-----
.../kafka/image/publisher/SnapshotGenerator.java | 21 ++++++-
.../metadata/migration/KRaftMigrationDriver.java | 23 ++++---
.../kafka/image/loader/MetadataLoaderTest.java | 26 ++++----
.../migration/KRaftMigrationDriverTest.java | 5 +-
.../org/apache/kafka/server/util/FutureUtils.java | 9 +--
.../apache/kafka/server/util/FutureUtilsTest.java | 3 +
20 files changed, 329 insertions(+), 196 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index ae7767c9d06..6b1bc710f39 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -26,7 +26,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
import kafka.server.KafkaRaftServer.ControllerRole
-import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher,
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, DynamicConfigPublisher,
KRaftMetadataCache, SnapshotWriterBuilder}
+import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher,
BrokerMetadataSnapshotter, ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache,
ScramPublisher, SnapshotWriterBuilder}
import kafka.utils.CoreUtils
import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -217,8 +217,9 @@ class BrokerServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new
CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
- val voterConnections = FutureUtils.waitWithLogging(logger.underlying,
- "controller quorum voters future",
sharedServer.controllerQuorumVotersFuture,
+ val voterConnections = FutureUtils.waitWithLogging(logger.underlying,
logIdent,
+ "controller quorum voters future",
+ sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
val controllerNodes =
RaftConfig.voterConnectionsToNodes(voterConnections).asScala
val controllerNodeProvider = RaftControllerNodeProvider(raftManager,
config, controllerNodes)
@@ -437,7 +438,8 @@ class BrokerServer(
config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)
- FutureUtils.waitWithLogging(logger.underlying, "broker metadata to catch
up",
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "broker metadata to catch up",
lifecycleManager.initialCatchUpFuture, startupDeadline, time)
// Apply the metadata log changes that we've accumulated.
@@ -447,14 +449,22 @@ class BrokerServer(
replicaManager,
groupCoordinator,
transactionCoordinator,
- clientQuotaMetadataManager,
new DynamicConfigPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
dynamicConfigHandlers.toMap,
"broker"),
+ new DynamicClientQuotaPublisher(
+ config,
+ sharedServer.metadataPublishingFaultHandler,
+ "broker",
+ clientQuotaMetadataManager),
+ new ScramPublisher(
+ config,
+ sharedServer.metadataPublishingFaultHandler,
+ "broker",
+ credentialProvider),
authorizer,
- credentialProvider,
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler)
@@ -465,7 +475,7 @@ class BrokerServer(
// publish operation to complete. This first operation will initialize
logManager,
// replicaManager, groupCoordinator, and txnCoordinator. The log manager
may perform
// a potentially lengthy recovery-from-unclean-shutdown operation here,
if required.
- FutureUtils.waitWithLogging(logger.underlying,
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
"the broker to catch up with the current cluster metadata",
metadataListener.startPublishing(metadataPublisher), startupDeadline,
time)
@@ -489,15 +499,18 @@ class BrokerServer(
// We're now ready to unfence the broker. This also allows this broker
to transition
// from RECOVERY state to RUNNING state, once the controller unfences
the broker.
- FutureUtils.waitWithLogging(logger.underlying, "the broker to be
unfenced",
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "the broker to be unfenced",
lifecycleManager.setReadyToUnfence(), startupDeadline, time)
// Block here until all the authorizer futures are complete
- FutureUtils.waitWithLogging(logger.underlying, "all of the authorizer
futures to be completed",
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*),
startupDeadline, time)
// Wait for all the SocketServer ports to be open, and the Acceptors to
be started.
- FutureUtils.waitWithLogging(logger.underlying, "all of the SocketServer
Acceptors to be started",
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)
maybeChangeStatus(STARTING, STARTED)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index c231486e41e..f03a05a88b0 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -27,7 +27,7 @@ import
kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPoli
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
-import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher}
+import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.config.ConfigException
@@ -37,6 +37,7 @@ import
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.LogContext
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController,
QuorumFeatures}
+import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
@@ -196,7 +197,7 @@ class ControllerServer(
alterConfigPolicy = Option(config.
getConfiguredInstance(AlterConfigPolicyClassNameProp,
classOf[AlterConfigPolicy]))
- val voterConnections = FutureUtils.waitWithLogging(logger.underlying,
+ val voterConnections = FutureUtils.waitWithLogging(logger.underlying,
logIdent,
"controller quorum voters future",
sharedServer.controllerQuorumVotersFuture,
startupDeadline, time)
@@ -297,33 +298,51 @@ class ControllerServer(
val socketServerFuture =
socketServer.enableRequestProcessing(authorizerFutures)
// Block here until all the authorizer futures are complete
- FutureUtils.waitWithLogging(logger.underlying, "all of the authorizer
futures to be completed",
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*),
startupDeadline, time)
// Wait for all the SocketServer ports to be open, and the Acceptors to
be started.
- FutureUtils.waitWithLogging(logger.underlying, "all of the SocketServer
Acceptors to be started",
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)
// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
- // We must install the below publisher and receive the changes when we
are also running the broker role
- // because we don't share a single KafkaConfig instance with the broker,
and therefore
- // the broker's DynamicConfigPublisher won't take care of the changes
for us.
- val dynamicConfigHandlers = immutable.Map[String, ConfigHandler](
- // controllers don't host topics, so no need to do anything with
dynamic topic config changes here
- ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
- val dynamicConfigPublisher = new DynamicConfigPublisher(
+
+ val publishers = new java.util.ArrayList[MetadataPublisher]()
+
+ // Set up the dynamic config publisher. This runs even in combined mode,
since the broker
+ // has its own separate dynamic configuration object.
+ publishers.add(new DynamicConfigPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
- dynamicConfigHandlers,
- "controller")
- val dynamicClientQuotaPublisher = new DynamicClientQuotaPublisher(
+ immutable.Map[String, ConfigHandler](
+ // controllers don't host topics, so no need to do anything with
dynamic topic config changes here
+ ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)
+ ),
+ "controller"))
+
+ // Set up the client quotas publisher. This will enable controller
mutation quotas and any
+ // other quotas which are applicable.
+ publishers.add(new DynamicClientQuotaPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
- clientQuotaMetadataManager)
- FutureUtils.waitWithLogging(logger.underlying, "all of the dynamic
config and client quota publishers to be installed",
- sharedServer.loader.installPublishers(List(dynamicConfigPublisher,
dynamicClientQuotaPublisher).asJava), startupDeadline, time)
+ clientQuotaMetadataManager))
+
+ // Set up the SCRAM publisher.
+ publishers.add(new ScramPublisher(
+ config,
+ sharedServer.metadataPublishingFaultHandler,
+ "controller",
+ credentialProvider
+ ))
+
+ // Install all metadata publishers.
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "all of the metadata publishers to be installed",
+ sharedServer.loader.installPublishers(publishers), startupDeadline,
time)
} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index d11fbe99aa6..d31ee6db522 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -77,10 +77,7 @@ class KafkaRaftServer(
)
private val broker: Option[BrokerServer] = if
(config.processRoles.contains(BrokerRole)) {
- Some(new BrokerServer(
- sharedServer,
- offlineDirs
- ))
+ Some(new BrokerServer(sharedServer, offlineDirs))
} else {
None
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 38c3d1a8e13..4ad9f15beb8 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -21,7 +21,6 @@ import java.util.{OptionalInt, Properties}
import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
-import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
@@ -102,10 +101,10 @@ class BrokerMetadataPublisher(
replicaManager: ReplicaManager,
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
- clientQuotaMetadataManager: ClientQuotaMetadataManager,
var dynamicConfigPublisher: DynamicConfigPublisher,
+ dynamicClientQuotaPublisher: DynamicClientQuotaPublisher,
+ scramPublisher: ScramPublisher,
private val _authorizer: Option[Authorizer],
- credentialProvider: CredentialProvider,
fatalFaultHandler: FaultHandler,
metadataPublishingFaultHandler: FaultHandler
) extends MetadataPublisher with Logging {
@@ -213,32 +212,13 @@ class BrokerMetadataPublisher(
}
// Apply configuration deltas.
- dynamicConfigPublisher.publish(delta, newImage)
+ dynamicConfigPublisher.onMetadataUpdate(delta, newImage)
// Apply client quotas delta.
- try {
- Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
- clientQuotaMetadataManager.update(clientQuotasDelta)
- }
- } catch {
- case t: Throwable => metadataPublishingFaultHandler.handleFault("Error
updating client " +
- s"quotas in $deltaName", t)
- }
+ dynamicClientQuotaPublisher.onMetadataUpdate(delta, newImage)
- // Apply changes to SCRAM credentials.
- Option(delta.scramDelta()).foreach { scramDelta =>
- scramDelta.changes().forEach {
- case (mechanism, userChanges) =>
- userChanges.forEach {
- case (userName, change) =>
- if (change.isPresent) {
- credentialProvider.updateCredential(mechanism, userName,
change.get().toCredential(mechanism))
- } else {
- credentialProvider.removeCredentials(mechanism, userName)
- }
- }
- }
- }
+ // Apply SCRAM delta.
+ scramPublisher.onMetadataUpdate(delta, newImage)
// Apply changes to ACLs. This needs to be handled carefully because
while we are
// applying these changes, the Authorizer is continuing to return
authorization
diff --git
a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
index 0533161b6d1..0ac93a46db1 100644
---
a/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
+++
b/core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala
@@ -19,7 +19,7 @@ package kafka.server.metadata
import kafka.server.KafkaConfig
import kafka.utils.Logging
-import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
+import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.fault.FaultHandler
@@ -32,7 +32,20 @@ class DynamicClientQuotaPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
- def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+ override def name(): String = s"DynamicClientQuotaPublisher ${nodeType}
id=${conf.nodeId}"
+
+ override def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ manifest: LoaderManifest
+ ): Unit = {
+ onMetadataUpdate(delta, newImage)
+ }
+
+ def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ ): Unit = {
val deltaName = s"MetadataDelta up to
${newImage.highestOffsetAndEpoch().offset}"
try {
Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
@@ -43,40 +56,4 @@ class DynamicClientQuotaPublisher(
s"publishing dynamic client quota changes from ${deltaName}", t)
}
}
-
- /**
- * Returns the name of this publisher.
- *
- * @return The publisher name.
- */
- override def name(): String = s"DynamicClientQuotaPublisher ${nodeType}
id=${conf.nodeId}"
-
- /**
- * Publish a new cluster metadata snapshot that we loaded.
- *
- * @param delta The delta between the previous state and the new one.
- * @param newImage The complete new state.
- * @param manifest The contents of what was published.
- */
- override def publishSnapshot(delta: MetadataDelta, newImage: MetadataImage,
manifest: SnapshotManifest): Unit = {
- publish(delta, newImage)
- }
-
- /**
- * Publish a change to the cluster metadata.
- *
- * @param delta The delta between the previous state and the new one.
- * @param newImage The complete new state.
- * @param manifest The contents of what was published.
- */
- override def publishLogDelta(delta: MetadataDelta, newImage: MetadataImage,
manifest: LogDeltaManifest): Unit = {
- publish(delta, newImage)
- }
-
- /**
- * Close this metadata publisher.
- */
- override def close(): Unit = {
- // nothing to close
- }
}
diff --git
a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
index a3581350137..b5db4e246b6 100644
--- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala
@@ -22,7 +22,7 @@ import kafka.server.ConfigAdminManager.toLoggableProps
import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig}
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC}
-import org.apache.kafka.image.loader.{LogDeltaManifest, SnapshotManifest}
+import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.fault.FaultHandler
@@ -35,7 +35,20 @@ class DynamicConfigPublisher(
) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
logIdent = s"[${name()}] "
- def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+ override def name(): String = s"DynamicConfigPublisher ${nodeType}
id=${conf.nodeId}"
+
+ override def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ manifest: LoaderManifest
+ ): Unit = {
+ onMetadataUpdate(delta, newImage)
+ }
+
+ def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ ): Unit = {
val deltaName = s"MetadataDelta up to
${newImage.highestOffsetAndEpoch().offset}"
try {
// Apply configuration deltas.
@@ -101,40 +114,4 @@ class DynamicConfigPublisher(
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}
-
- /**
- * Returns the name of this publisher.
- *
- * @return The publisher name.
- */
- override def name(): String = s"DynamicConfigPublisher ${nodeType}
id=${conf.nodeId}"
-
- /**
- * Publish a new cluster metadata snapshot that we loaded.
- *
- * @param delta The delta between the previous state and the new one.
- * @param newImage The complete new state.
- * @param manifest The contents of what was published.
- */
- override def publishSnapshot(delta: MetadataDelta, newImage: MetadataImage,
manifest: SnapshotManifest): Unit = {
- publish(delta, newImage)
- }
-
- /**
- * Publish a change to the cluster metadata.
- *
- * @param delta The delta between the previous state and the new one.
- * @param newImage The complete new state.
- * @param manifest The contents of what was published.
- */
- override def publishLogDelta(delta: MetadataDelta, newImage: MetadataImage,
manifest: LogDeltaManifest): Unit = {
- publish(delta, newImage)
- }
-
- /**
- * Close this metadata publisher.
- */
- override def close(): Unit = {
- // nothing to close
- }
}
diff --git a/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
new file mode 100644
index 00000000000..535ca6e8b57
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/ScramPublisher.scala
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.security.CredentialProvider
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class ScramPublisher(
+ conf: KafkaConfig,
+ faultHandler: FaultHandler,
+ nodeType: String,
+ credentialProvider: CredentialProvider,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+ logIdent = s"[${name()}] "
+
+ override def name(): String = s"ScramPublisher ${nodeType} id=${conf.nodeId}"
+
+ override def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ manifest: LoaderManifest
+ ): Unit = {
+ onMetadataUpdate(delta, newImage)
+ }
+
+ def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ ): Unit = {
+ val deltaName = s"MetadataDelta up to
${newImage.highestOffsetAndEpoch().offset}"
+ try {
+ // Apply changes to SCRAM credentials.
+ Option(delta.scramDelta()).foreach { scramDelta =>
+ scramDelta.changes().forEach {
+ case (mechanism, userChanges) =>
+ userChanges.forEach {
+ case (userName, change) =>
+ if (change.isPresent) {
+ credentialProvider.updateCredential(mechanism, userName,
change.get().toCredential(mechanism))
+ } else {
+ credentialProvider.removeCredentials(mechanism, userName)
+ }
+ }
+ }
+ }
+ } catch {
+ case t: Throwable => faultHandler.handleFault("Uncaught exception while
" +
+ s"publishing SCRAM changes from ${deltaName}", t)
+ }
+ }
+}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index f522d11d532..d4cdda431ed 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -23,7 +23,6 @@ import java.util.Collections.{singleton, singletonList,
singletonMap}
import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.{LogManager, UnifiedLog}
-import kafka.security.CredentialProvider
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
@@ -275,10 +274,6 @@ class BrokerMetadataPublisherTest {
val logManager = mock(classOf[LogManager])
val replicaManager = mock(classOf[ReplicaManager])
val groupCoordinator = mock(classOf[GroupCoordinator])
- val txnCoordinator = mock(classOf[TransactionCoordinator])
- val quotaManager = mock(classOf[ClientQuotaMetadataManager])
- val configPublisher = mock(classOf[DynamicConfigPublisher])
- val credentialProvider = mock(classOf[CredentialProvider])
val faultHandler = mock(classOf[FaultHandler])
val metadataPublisher = new BrokerMetadataPublisher(
@@ -287,11 +282,11 @@ class BrokerMetadataPublisherTest {
logManager,
replicaManager,
groupCoordinator,
- txnCoordinator,
- quotaManager,
- configPublisher,
+ mock(classOf[TransactionCoordinator]),
+ mock(classOf[DynamicConfigPublisher]),
+ mock(classOf[DynamicClientQuotaPublisher]),
+ mock(classOf[ScramPublisher]),
None,
- credentialProvider,
faultHandler,
faultHandler
)
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifest.java
b/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifest.java
new file mode 100644
index 00000000000..60889997f3c
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.image.MetadataProvenance;
+
+
+/**
+ * Contains information about what was loaded.
+ */
+public interface LoaderManifest {
+ /**
+ * Describes the type of manifest which this is.
+ */
+ LoaderManifestType type();
+
+ /**
+ * The highest offset and epoch included in the new image, inclusive.
+ */
+ MetadataProvenance provenance();
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java
b/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java
new file mode 100644
index 00000000000..f83cadc8a26
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+
+/**
+ * Contains information about the type of a loader manifest.
+ */
+public enum LoaderManifestType {
+ LOG_DELTA,
+ SNAPSHOT;
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
b/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
index 982a1f8e271..6a8cec4e4cf 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
@@ -26,7 +26,7 @@ import java.util.Objects;
/**
* Contains information about a set of changes that were loaded from the
metadata log.
*/
-public class LogDeltaManifest {
+public class LogDeltaManifest implements LoaderManifest {
/**
* The highest offset and epoch included in this delta, inclusive.
*/
@@ -66,7 +66,12 @@ public class LogDeltaManifest {
this.numBytes = numBytes;
}
+ @Override
+ public LoaderManifestType type() {
+ return LoaderManifestType.LOG_DELTA;
+ }
+ @Override
public MetadataProvenance provenance() {
return provenance;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 21df1a761cd..d204ac32ed9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -258,7 +258,8 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
try {
log.info("Publishing initial snapshot at offset {} to {}",
image.highestOffsetAndEpoch().offset(),
publisher.name());
- publisher.publishSnapshot(delta, image, manifest);
+ publisher.onMetadataUpdate(delta, image, manifest);
+ publisher.onControllerChange(currentLeaderAndEpoch);
publishers.put(publisher.name(), publisher);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the
initial metadata " +
@@ -295,7 +296,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
log.debug("Publishing new image with provenance {}.",
image.provenance());
for (MetadataPublisher publisher : publishers.values()) {
try {
- publisher.publishLogDelta(delta, image, manifest);
+ publisher.onMetadataUpdate(delta, image, manifest);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing
the new metadata " +
"image ending at " +
manifest.provenance().lastContainedOffset() +
@@ -392,7 +393,7 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
log.debug("Publishing new snapshot image with provenance {}.",
image.provenance());
for (MetadataPublisher publisher : publishers.values()) {
try {
- publisher.publishSnapshot(delta, image, manifest);
+ publisher.onMetadataUpdate(delta, image, manifest);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing
the new metadata " +
"image from snapshot at offset " +
reader.lastContainedLogOffset() +
@@ -449,6 +450,15 @@ public class MetadataLoader implements
RaftClient.Listener<ApiMessageAndVersion>
public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
eventQueue.append(() -> {
currentLeaderAndEpoch = leaderAndEpoch;
+ for (MetadataPublisher publisher : publishers.values()) {
+ try {
+ publisher.onControllerChange(currentLeaderAndEpoch);
+ } catch (Throwable e) {
+ faultHandler.handleFault("Unhandled error publishing the
new leader " +
+ "change to " + currentLeaderAndEpoch + " with
publisher " +
+ publisher.name(), e);
+ }
+ }
});
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
b/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
index b6c6dcce4d5..5653a4689ea 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
@@ -25,7 +25,7 @@ import java.util.Objects;
/**
* Contains information about a snapshot that was loaded.
*/
-public class SnapshotManifest {
+public class SnapshotManifest implements LoaderManifest {
/**
* The source of this snapshot.
*/
@@ -44,6 +44,12 @@ public class SnapshotManifest {
this.elapsedNs = elapsedNs;
}
+ @Override
+ public LoaderManifestType type() {
+ return LoaderManifestType.SNAPSHOT;
+ }
+
+ @Override
public MetadataProvenance provenance() {
return provenance;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java
b/metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java
index 8dfba7a99ab..55f158f2809 100644
---
a/metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java
+++
b/metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java
@@ -19,8 +19,8 @@ package org.apache.kafka.image.publisher;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
-import org.apache.kafka.image.loader.LogDeltaManifest;
-import org.apache.kafka.image.loader.SnapshotManifest;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.raft.LeaderAndEpoch;
/**
@@ -40,33 +40,30 @@ public interface MetadataPublisher extends AutoCloseable {
String name();
/**
- * Publish a new cluster metadata snapshot that we loaded.
+ * Handle a change in the current controller.
*
- * @param delta The delta between the previous state and the new one.
- * @param newImage The complete new state.
- * @param manifest The contents of what was published.
+ * @param newLeaderAndEpoch The new quorum leader and epoch. The new
leader will be
+ * OptionalInt.empty if there is currently no
active controller.
*/
- void publishSnapshot(
- MetadataDelta delta,
- MetadataImage newImage,
- SnapshotManifest manifest
- );
+ default void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) { }
/**
- * Publish a change to the cluster metadata.
+ * Publish a new cluster metadata snapshot that we loaded.
*
* @param delta The delta between the previous state and the new one.
* @param newImage The complete new state.
- * @param manifest The contents of what was published.
+ * @param manifest A manifest which describes the contents of what was
published.
+ * If we loaded a snapshot, this will be a
SnapshotManifest.
+ * If we loaded a log delta, this will be a
LogDeltaManifest.
*/
- void publishLogDelta(
+ void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
- LogDeltaManifest manifest
+ LoaderManifest manifest
);
/**
- * Close this metadata publisher.
+ * Close this metadata publisher and free any associated resources.
*/
- void close() throws Exception;
+ default void close() throws Exception { }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
index 651acc52483..989f6299401 100644
---
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
+++
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.queue.EventQueue;
@@ -200,7 +201,22 @@ public class SnapshotGenerator implements
MetadataPublisher {
}
@Override
- public void publishSnapshot(
+ public void onMetadataUpdate(
+ MetadataDelta delta,
+ MetadataImage newImage,
+ LoaderManifest manifest
+ ) {
+ switch (manifest.type()) {
+ case LOG_DELTA:
+ publishLogDelta(delta, newImage, (LogDeltaManifest) manifest);
+ break;
+ case SNAPSHOT:
+ publishSnapshot(delta, newImage, (SnapshotManifest) manifest);
+ break;
+ }
+ }
+
+ void publishSnapshot(
MetadataDelta delta,
MetadataImage newImage,
SnapshotManifest manifest
@@ -209,8 +225,7 @@ public class SnapshotGenerator implements MetadataPublisher
{
resetSnapshotCounters();
}
- @Override
- public void publishLogDelta(
+ void publishLogDelta(
MetadataDelta delta,
MetadataImage newImage,
LogDeltaManifest manifest
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 3394741f278..b29edc25807 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -23,8 +23,8 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
-import org.apache.kafka.image.loader.LogDeltaManifest;
-import org.apache.kafka.image.loader.SnapshotManifest;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.loader.LoaderManifestType;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.queue.EventQueue;
@@ -223,16 +223,21 @@ public class KRaftMigrationDriver implements
MetadataPublisher {
}
@Override
- public void publishSnapshot(MetadataDelta delta, MetadataImage newImage,
SnapshotManifest manifest) {
- enqueueMetadataChangeEvent(delta, newImage, manifest.provenance(),
true, NO_OP_HANDLER);
+ public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
+ eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch));
}
@Override
- public void publishLogDelta(MetadataDelta delta, MetadataImage newImage,
LogDeltaManifest manifest) {
- if (!leaderAndEpoch.equals(manifest.leaderAndEpoch())) {
- eventQueue.append(new KRaftLeaderEvent(manifest.leaderAndEpoch()));
- }
- enqueueMetadataChangeEvent(delta, newImage, manifest.provenance(),
false, NO_OP_HANDLER);
+ public void onMetadataUpdate(
+ MetadataDelta delta,
+ MetadataImage newImage,
+ LoaderManifest manifest
+ ) {
+ enqueueMetadataChangeEvent(delta,
+ newImage,
+ manifest.provenance(),
+ manifest.type() == LoaderManifestType.SNAPSHOT,
+ NO_OP_HANDLER);
}
/**
diff --git
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index b7e49243c43..b234d36a708 100644
---
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -92,25 +92,23 @@ public class MetadataLoaderTest {
}
@Override
- public void publishSnapshot(
+ public void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
- SnapshotManifest manifest
+ LoaderManifest manifest
) {
latestDelta = delta;
latestImage = newImage;
- latestSnapshotManifest = manifest;
- }
-
- @Override
- public void publishLogDelta(
- MetadataDelta delta,
- MetadataImage newImage,
- LogDeltaManifest manifest
- ) {
- latestDelta = delta;
- latestImage = newImage;
- latestLogDeltaManifest = manifest;
+ switch (manifest.type()) {
+ case LOG_DELTA:
+ latestLogDeltaManifest = (LogDeltaManifest) manifest;
+ break;
+ case SNAPSHOT:
+ latestSnapshotManifest = (SnapshotManifest) manifest;
+ break;
+ default:
+ throw new RuntimeException("Invalid manifest type " +
manifest.type());
+ }
}
@Override
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index b25f1a10a76..01b749ebc9a 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -277,8 +277,9 @@ public class KRaftMigrationDriverTest {
image = delta.apply(provenance);
// Publish a delta with this node (3000) as the leader
- driver.publishLogDelta(delta, image, new LogDeltaManifest(provenance,
- new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
+ LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
+ driver.onControllerChange(newLeader);
+ driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
newLeader, 1, 100, 42));
TestUtils.waitForCondition(() -> driver.migrationState().get(1,
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
index 0d3ee81dc96..642179e81d9 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/FutureUtils.java
@@ -43,18 +43,19 @@ public class FutureUtils {
*/
public static <T> T waitWithLogging(
Logger log,
+ String prefix,
String action,
CompletableFuture<T> future,
Deadline deadline,
Time time
) throws Throwable {
- log.info("Waiting for {}", action);
+ log.info("{}Waiting for {}", prefix, action);
try {
T result = time.waitForFuture(future, deadline.nanoseconds());
- log.info("Finished waiting for {}", action);
+ log.info("{}Finished waiting for {}", prefix, action);
return result;
} catch (TimeoutException t) {
- log.error("Timed out while waiting for {}", action, t);
+ log.error("{}Timed out while waiting for {}", prefix, action, t);
TimeoutException timeout = new TimeoutException("Timed out while
waiting for " + action);
timeout.setStackTrace(t.getStackTrace());
throw timeout;
@@ -63,7 +64,7 @@ public class FutureUtils {
ExecutionException executionException = (ExecutionException) t;
t = executionException.getCause();
}
- log.error("Received a fatal error while waiting for {}", action,
t);
+ log.error("{}Received a fatal error while waiting for {}", prefix,
action, t);
throw new RuntimeException("Received a fatal error while waiting
for " + action, t);
}
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java
b/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java
index 8e3703b1104..f7f029a5426 100644
---
a/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/util/FutureUtilsTest.java
@@ -47,6 +47,7 @@ public class FutureUtilsTest {
CompletableFuture<Integer> future = new CompletableFuture<>();
executorService.schedule(() -> future.complete(123), 1000,
TimeUnit.NANOSECONDS);
assertEquals(123, FutureUtils.waitWithLogging(log,
+ "[FutureUtilsTest] ",
"the future to be completed",
future,
Deadline.fromDelay(Time.SYSTEM, 30, TimeUnit.SECONDS),
@@ -63,6 +64,7 @@ public class FutureUtilsTest {
executorService.schedule(() -> future.complete(456), 10000,
TimeUnit.MILLISECONDS);
assertThrows(TimeoutException.class, () -> {
FutureUtils.waitWithLogging(log,
+ "[FutureUtilsTest] ",
"the future to be completed",
future,
immediateTimeout ?
@@ -84,6 +86,7 @@ public class FutureUtilsTest {
assertEquals("Received a fatal error while waiting for the future to
be completed",
assertThrows(RuntimeException.class, () -> {
FutureUtils.waitWithLogging(log,
+ "[FutureUtilsTest] ",
"the future to be completed",
future,
Deadline.fromDelay(Time.SYSTEM, 30, TimeUnit.SECONDS),