This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5d597b0fc81 MINOR: Replace BrokerMetadataListener with MetadataLoader
(#13344)
5d597b0fc81 is described below
commit 5d597b0fc81a1e77905d57666069b336eba8805c
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Mar 14 10:24:37 2023 -0700
MINOR: Replace BrokerMetadataListener with MetadataLoader (#13344)
Replace BrokerMetadataListener with MetadataLoader. BrokerMetadataListener
was, in a sense, an
early prototype of MetadataLoader. It constructed a MetadataImage object
from records supplied by
the Raft layer.
MetadataLoader is a rewrite of this code with several improvements.
- It is in Java, and not part of the core gradle module.
- It is not broker-specific. This is especially useful in combined mode
where it means we do not
have to have a separate, controller-specific code path for creating
MetadataImage objects.
- It supports multiple publishers rather than a single publisher. The
publishers can be dynamically
added and removed.
This PR also removes BrokerMetadataSnapshotter in favor of the code path
which the controller is
already using, org.apache.kafka.image.publisher.SnapshotGenerator. This
means we only have to
maintain one snapshotter, rather than two.
Reviewers: David Arthur <[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 115 +++---
.../main/scala/kafka/server/ControllerServer.scala | 2 +-
.../src/main/scala/kafka/server/SharedServer.scala | 60 ++-
.../server/metadata/BrokerMetadataListener.scala | 395 --------------------
.../server/metadata/BrokerMetadataPublisher.scala | 27 +-
.../metadata/BrokerMetadataSnapshotter.scala | 141 -------
.../kafka/server/metadata/MetadataPublisher.scala | 38 --
.../metadata/BrokerMetadataListenerTest.scala | 413 ---------------------
.../metadata/BrokerMetadataPublisherTest.scala | 17 +-
.../metadata/BrokerMetadataSnapshotterTest.scala | 129 -------
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
11 files changed, 105 insertions(+), 1234 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 6b1bc710f39..b73eaa12fee 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -25,8 +25,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
import kafka.security.CredentialProvider
-import kafka.server.KafkaRaftServer.ControllerRole
-import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher,
BrokerMetadataSnapshotter, ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache,
ScramPublisher, SnapshotWriterBuilder}
+import kafka.server.metadata.{BrokerMetadataPublisher,
ClientQuotaMetadataManager, DynamicClientQuotaPublisher,
DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
import kafka.utils.CoreUtils
import org.apache.kafka.common.feature.SupportedVersionRange
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,16 +37,15 @@ import
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.raft
-import org.apache.kafka.raft.{RaftClient, RaftConfig}
+import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.snapshot.SnapshotWriter
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import java.net.InetAddress
@@ -56,20 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit,
TimeoutException}
import scala.collection.{Map, Seq}
-import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
-class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
- extends SnapshotWriterBuilder {
- override def build(committedOffset: Long,
- committedEpoch: Int,
- lastContainedLogTime: Long):
Option[SnapshotWriter[ApiMessageAndVersion]] = {
- val snapshotId = new raft.OffsetAndEpoch(committedOffset + 1,
committedEpoch)
- raftClient.createSnapshot(snapshotId, lastContainedLogTime).asScala
- }
-}
-
/**
* A Kafka broker that runs in KRaft (Kafka Raft) mode.
*/
@@ -146,11 +133,7 @@ class BrokerServer(
val clusterId: String = sharedServer.metaProps.clusterId
- var metadataSnapshotter: Option[BrokerMetadataSnapshotter] = None
-
- var metadataListener: BrokerMetadataListener = _
-
- var metadataPublisher: BrokerMetadataPublisher = _
+ var brokerMetadataPublisher: BrokerMetadataPublisher = _
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
@@ -317,25 +300,6 @@ class BrokerServer(
ConfigType.Topic -> new TopicConfigHandler(logManager, config,
quotaManagers, None),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
- if (!config.processRoles.contains(ControllerRole)) {
- // If no controller is defined, we rely on the broker to generate
snapshots.
- metadataSnapshotter = Some(new BrokerMetadataSnapshotter(
- config.nodeId,
- time,
- threadNamePrefix,
- new BrokerSnapshotWriterBuilder(raftManager.client)
- ))
- }
-
- metadataListener = new BrokerMetadataListener(
- config.nodeId,
- time,
- threadNamePrefix,
- config.metadataSnapshotMaxNewRecordBytes,
- metadataSnapshotter,
- sharedServer.brokerMetrics,
- sharedServer.metadataLoaderFaultHandler)
-
val networkListeners = new ListenerCollection()
config.effectiveAdvertisedListeners.foreach { ep =>
networkListeners.add(new Listener().
@@ -360,15 +324,17 @@ class BrokerServer(
config.brokerSessionTimeoutMs / 2 // KAFKA-14392
)
lifecycleManager.start(
- () => metadataListener.highestMetadataOffset,
+ () => sharedServer.loader.lastAppliedOffset(),
brokerLifecycleChannelManager,
sharedServer.metaProps.clusterId,
networkListeners,
featuresRemapped
)
-
- // Register a listener with the Raft layer to receive metadata event
notifications
- raftManager.register(metadataListener)
+ // If the BrokerLifecycleManager's initial catch-up future fails, it
means we timed out
+ // or are shutting down before we could catch up. Therefore, also fail
the firstPublishFuture.
+ lifecycleManager.initialCatchUpFuture.whenComplete((_, e) => {
+ if (e != null)
brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
+ })
val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null
@@ -438,12 +404,8 @@ class BrokerServer(
config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix)
- FutureUtils.waitWithLogging(logger.underlying, logIdent,
- "broker metadata to catch up",
- lifecycleManager.initialCatchUpFuture, startupDeadline, time)
-
- // Apply the metadata log changes that we've accumulated.
- metadataPublisher = new BrokerMetadataPublisher(config,
+ val publishers = new util.ArrayList[MetadataPublisher]()
+ brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
logManager,
replicaManager,
@@ -467,28 +429,42 @@ class BrokerServer(
authorizer,
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler)
+ publishers.add(brokerMetadataPublisher)
- // Add all reconfigurables for config change notification before
installing the metadata publisher.
+ // Register parts of the broker that can be reconfigured via dynamic
configs. This needs to
+ // be done before we publish the dynamic configs, so that we don't miss
anything.
config.dynamicConfig.addReconfigurables(this)
- // Tell the metadata listener to start publishing its output, and wait
for the first
- // publish operation to complete. This first operation will initialize
logManager,
- // replicaManager, groupCoordinator, and txnCoordinator. The log manager
may perform
- // a potentially lengthy recovery-from-unclean-shutdown operation here,
if required.
+ // Install all the metadata publishers.
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "the broker metadata publishers to be installed",
+ sharedServer.loader.installPublishers(publishers), startupDeadline,
time)
+
+ // Wait for this broker to contact the quorum, and for the active
controller to acknowledge
+ // us as caught up. It will do this by returning a heartbeat response
with isCaughtUp set to
+ // true. The BrokerLifecycleManager tracks this.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
- "the broker to catch up with the current cluster metadata",
- metadataListener.startPublishing(metadataPublisher), startupDeadline,
time)
+ "the controller to acknowledge that we are caught up",
+ lifecycleManager.initialCatchUpFuture, startupDeadline, time)
- // Log static broker configurations.
+ // Wait for the first metadata update to be published. Metadata updates
are not published
+ // until we read at least up to the high water mark of the cluster
metadata partition.
+ // Usually, we publish the initial metadata before
lifecycleManager.initialCatchUpFuture
+ // is completed, so this check is not necessary. But this is a simple
check to make
+ // completely sure.
+ FutureUtils.waitWithLogging(logger.underlying, logIdent,
+ "the initial broker metadata update to be published",
+ brokerMetadataPublisher.firstPublishFuture , startupDeadline, time)
+
+ // Now that we have loaded some metadata, we can log a reasonably
up-to-date broker
+ // configuration. Keep in mind that KafkaConfig.originals is a mutable
field that gets set
+ // by the dynamic configuration publisher. Ironically,
KafkaConfig.originals does not
+ // contain the original configuration values.
new KafkaConfig(config.originals(), true)
// Start RemoteLogManager before broker start serving the requests.
remoteLogManager.foreach(_.startup())
- // Enable inbound TCP connections. Each endpoint will be started only
once its matching
- // authorizer future is completed.
- val socketServerFuture =
socketServer.enableRequestProcessing(authorizerFutures)
-
// If we are using a ClusterMetadataAuthorizer which stores its ACLs in
the metadata log,
// notify it that the loading process is complete.
authorizer match {
@@ -503,7 +479,11 @@ class BrokerServer(
"the broker to be unfenced",
lifecycleManager.setReadyToUnfence(), startupDeadline, time)
- // Block here until all the authorizer futures are complete
+ // Enable inbound TCP connections. Each endpoint will be started only
once its matching
+ // authorizer future is completed.
+ val enableRequestProcessingFuture =
socketServer.enableRequestProcessing(authorizerFutures)
+
+ // Block here until all the authorizer futures are complete.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*),
startupDeadline, time)
@@ -511,7 +491,7 @@ class BrokerServer(
// Wait for all the SocketServer ports to be open, and the Acceptors to
be started.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
"all of the SocketServer Acceptors to be started",
- socketServerFuture, startupDeadline, time)
+ enableRequestProcessingFuture, startupDeadline, time)
maybeChangeStatus(STARTING, STARTED)
} catch {
@@ -556,9 +536,6 @@ class BrokerServer(
}
}
- if (metadataListener != null)
- metadataListener.beginShutdown()
-
lifecycleManager.beginShutdown()
// Stop socket server to stop accepting any more connections and
requests.
@@ -573,10 +550,6 @@ class BrokerServer(
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
- if (metadataListener != null) {
- CoreUtils.swallow(metadataListener.close(), this)
- }
- metadataSnapshotter.foreach(snapshotter =>
CoreUtils.swallow(snapshotter.close(), this))
/**
* We must shutdown the scheduler early because otherwise, the scheduler
could touch other
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index f03a05a88b0..282787ac49e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -341,7 +341,7 @@ class ControllerServer(
// Install all metadata publishers.
FutureUtils.waitWithLogging(logger.underlying, logIdent,
- "all of the metadata publishers to be installed",
+ "the controller metadata publishers to be installed",
sharedServer.loader.installPublishers(publishers), startupDeadline,
time)
} catch {
case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index ed1953601bd..554207d8c21 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -249,37 +249,35 @@ class SharedServer(
)
raftManager.startup()
- if (sharedServerConfig.processRoles.contains(ControllerRole)) {
- val loaderBuilder = new MetadataLoader.Builder().
- setNodeId(metaProps.nodeId).
- setTime(time).
- setThreadNamePrefix(threadNamePrefix.getOrElse("")).
- setFaultHandler(metadataLoaderFaultHandler).
- setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
- if (brokerMetrics != null) {
- loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
- }
- loader = loaderBuilder.build()
- snapshotEmitter = new SnapshotEmitter.Builder().
- setNodeId(metaProps.nodeId).
- setRaftClient(raftManager.client).
- build()
- snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
- setNodeId(metaProps.nodeId).
- setTime(time).
- setFaultHandler(metadataPublishingFaultHandler).
-
setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes).
-
setMaxTimeSinceLastSnapshotNs(TimeUnit.MILLISECONDS.toNanos(sharedServerConfig.metadataSnapshotMaxIntervalMs)).
- setDisabledReason(snapshotsDiabledReason).
- build()
- raftManager.register(loader)
- try {
-
loader.installPublishers(Collections.singletonList(snapshotGenerator))
- } catch {
- case t: Throwable => {
- error("Unable to install metadata publishers", t)
- throw new RuntimeException("Unable to install metadata
publishers.", t)
- }
+ val loaderBuilder = new MetadataLoader.Builder().
+ setNodeId(metaProps.nodeId).
+ setTime(time).
+ setThreadNamePrefix(threadNamePrefix.getOrElse("")).
+ setFaultHandler(metadataLoaderFaultHandler).
+ setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
+ if (brokerMetrics != null) {
+ loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
+ }
+ loader = loaderBuilder.build()
+ snapshotEmitter = new SnapshotEmitter.Builder().
+ setNodeId(metaProps.nodeId).
+ setRaftClient(raftManager.client).
+ build()
+ snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
+ setNodeId(metaProps.nodeId).
+ setTime(time).
+ setFaultHandler(metadataPublishingFaultHandler).
+
setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes).
+
setMaxTimeSinceLastSnapshotNs(TimeUnit.MILLISECONDS.toNanos(sharedServerConfig.metadataSnapshotMaxIntervalMs)).
+ setDisabledReason(snapshotsDiabledReason).
+ build()
+ raftManager.register(loader)
+ try {
+
loader.installPublishers(Collections.singletonList(snapshotGenerator))
+ } catch {
+ case t: Throwable => {
+ error("Unable to install metadata publishers", t)
+ throw new RuntimeException("Unable to install metadata
publishers.", t)
}
}
debug("Completed SharedServer startup.")
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
deleted file mode 100644
index a12f25de471..00000000000
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.metadata
-
-import kafka.utils.Logging
-
-import java.util
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CompletableFuture
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
-import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.fault.FaultHandler
-import org.apache.kafka.snapshot.SnapshotReader
-
-import java.util.concurrent.TimeUnit.NANOSECONDS
-import scala.compat.java8.OptionConverters._
-
-
-class BrokerMetadataListener(
- val brokerId: Int,
- time: Time,
- threadNamePrefix: Option[String],
- val maxBytesBetweenSnapshots: Long,
- val snapshotter: Option[MetadataSnapshotter],
- brokerMetrics: BrokerServerMetrics,
- _metadataLoadingFaultHandler: FaultHandler
-) extends RaftClient.Listener[ApiMessageAndVersion] with Logging {
-
- private val metadataFaultOccurred = new AtomicBoolean(false)
- private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
- override def handleFault(failureMessage: String, cause: Throwable):
RuntimeException = {
- // If the broker has any kind of error handling metadata records or
publishing a new image
- // we will disable taking new snapshots in order to preserve the local
metadata log. Once we
- // encounter a metadata processing error, the broker will be in an
undetermined state.
- if (metadataFaultOccurred.compareAndSet(false, true)) {
- error("Disabling metadata snapshots until this broker is restarted.")
- }
- _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
- }
- }
-
- private val logContext = new LogContext(s"[BrokerMetadataListener
id=$brokerId] ")
- private val log = logContext.logger(classOf[BrokerMetadataListener])
- logIdent = logContext.logPrefix()
-
- /**
- * The highest metadata offset that we've seen. Written only from the event
queue thread.
- */
- @volatile var _highestOffset = -1L
-
- /**
- * The highest metadata epoch that we've seen. Written only from the event
queue thread.
- */
- private var _highestEpoch = -1
-
- /**
- * The highest metadata log time that we've seen. Written only from the
event queue thread.
- */
- private var _highestTimestamp = -1L
-
- private def provenance(): MetadataProvenance =
- new MetadataProvenance(_highestOffset, _highestEpoch, _highestTimestamp)
-
- /**
- * The current broker metadata image. Accessed only from the event queue
thread.
- */
- private var _image = MetadataImage.EMPTY
-
- /**
- * The current metadata delta. Accessed only from the event queue thread.
- */
- private var _delta = new MetadataDelta(_image)
-
- /**
- * The object to use to publish new metadata changes, or None if this
listener has not
- * been activated yet. Accessed only from the event queue thread.
- */
- private var _publisher: Option[MetadataPublisher] = None
-
- /**
- * The number of bytes of records that we have read since the last snapshot
we took.
- * This does not include records we read from a snapshot.
- * Accessed only from the event queue thread.
- */
- private var _bytesSinceLastSnapshot: Long = 0L
-
- /**
- * The event queue which runs this listener.
- */
- val eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix.getOrElse(""))
-
- /**
- * Returns the highest metadata-offset. Thread-safe.
- */
- def highestMetadataOffset: Long = _highestOffset
-
- /**
- * Handle new metadata records.
- */
- override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
- eventQueue.append(new HandleCommitsEvent(reader))
-
- class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
- extends EventQueue.FailureLoggingEvent(log) {
- override def run(): Unit = {
- val results = try {
- val loadResults = loadBatches(_delta, reader, None, None, None, None)
- if (isDebugEnabled) {
- debug(s"Loaded new commits: $loadResults")
- }
- loadResults
- } catch {
- case e: Throwable =>
- metadataLoadingFaultHandler.handleFault(s"Unable to load metadata
commits " +
- s"from the BatchReader starting at base offset
${reader.baseOffset()}", e)
- return
- } finally {
- reader.close()
- }
-
- _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-
- val shouldTakeSnapshot: Set[SnapshotReason] = shouldSnapshot()
- if (shouldTakeSnapshot.nonEmpty) {
- maybeStartSnapshot(shouldTakeSnapshot)
- }
-
- _publisher.foreach(publish)
- }
- }
-
- private def shouldSnapshot(): Set[SnapshotReason] = {
- val maybeMetadataVersionChanged = metadataVersionChanged.toSet
-
- if (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) {
- maybeMetadataVersionChanged +
SnapshotReason.maxBytesExceeded(_bytesSinceLastSnapshot,
maxBytesBetweenSnapshots)
- } else {
- maybeMetadataVersionChanged
- }
- }
-
- private def metadataVersionChanged: Option[SnapshotReason] = {
- // The _publisher is empty before starting publishing, and we won't
compute feature delta
- // until we starting publishing
- if (_publisher.nonEmpty) {
- Option(_delta.featuresDelta()).flatMap { featuresDelta =>
- featuresDelta
- .metadataVersionChange()
- .asScala
- .map(SnapshotReason.metadataVersionChanged)
- }
- } else {
- None
- }
- }
-
- private def maybeStartSnapshot(reason: Set[SnapshotReason]): Unit = {
- snapshotter.foreach { snapshotter =>
- if (metadataFaultOccurred.get()) {
- trace("Not starting metadata snapshot since we previously had an
error")
- } else if (snapshotter.maybeStartSnapshot(_highestTimestamp,
_delta.apply(provenance()), reason)) {
- _bytesSinceLastSnapshot = 0L
- }
- }
- }
-
- /**
- * Handle metadata snapshots
- */
- override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]):
Unit =
- eventQueue.append(new HandleSnapshotEvent(reader))
-
- class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
- extends EventQueue.FailureLoggingEvent(log) {
- override def run(): Unit = {
- val snapshotName =
s"${reader.snapshotId().offset}-${reader.snapshotId().epoch}"
- try {
- info(s"Loading snapshot ${snapshotName}")
- _delta = new MetadataDelta(_image) // Discard any previous deltas.
- val loadResults = loadBatches(_delta,
- reader,
- Some(reader.lastContainedLogTimestamp),
- Some(reader.lastContainedLogOffset),
- Some(reader.lastContainedLogEpoch),
- Some(snapshotName))
- try {
- _delta.finishSnapshot()
- } catch {
- case e: Throwable => metadataLoadingFaultHandler.handleFault(
- s"Error finishing snapshot ${snapshotName}", e)
- }
- info(s"Loaded snapshot ${snapshotName}: ${loadResults}")
- } catch {
- case t: Throwable => metadataLoadingFaultHandler.handleFault("Uncaught
exception while " +
- s"loading broker metadata from Metadata snapshot ${snapshotName}", t)
- } finally {
- reader.close()
- }
- _publisher.foreach(publish)
- }
- }
-
- case class BatchLoadResults(numBatches: Int, numRecords: Int, elapsedUs:
Long, numBytes: Long) {
- override def toString: String = {
- s"$numBatches batch(es) with $numRecords record(s) in $numBytes bytes " +
- s"ending at offset $highestMetadataOffset in $elapsedUs microseconds"
- }
- }
-
- /**
- * Load and replay the batches to the metadata delta.
- *
- * When loading and replay a snapshot the appendTimestamp and snapshotId
parameter should be provided.
- * In a snapshot the append timestamp, offset and epoch reported by the
batch is independent of the ones
- * reported by the metadata log.
- *
- * @param delta metadata delta on which to replay the records
- * @param iterator sequence of metadata record bacthes to replay
- * @param lastAppendTimestamp optional append timestamp to use instead of
the batches timestamp
- * @param lastCommittedOffset optional offset to use instead of the batches
offset
- * @param lastCommittedEpoch optional epoch to use instead of the batches
epoch
- */
- private def loadBatches(
- delta: MetadataDelta,
- iterator: util.Iterator[Batch[ApiMessageAndVersion]],
- lastAppendTimestamp: Option[Long],
- lastCommittedOffset: Option[Long],
- lastCommittedEpoch: Option[Int],
- snapshotName: Option[String]
- ): BatchLoadResults = {
- val startTimeNs = time.nanoseconds()
- var numBatches = 0
- var numRecords = 0
- var numBytes = 0L
-
- while (iterator.hasNext) {
- val batch = iterator.next()
-
- _highestEpoch = lastCommittedEpoch.getOrElse(batch.epoch())
- _highestTimestamp =
lastAppendTimestamp.getOrElse(batch.appendTimestamp())
-
- var index = 0
- batch.records().forEach { messageAndVersion =>
- if (isTraceEnabled) {
- trace(s"Metadata batch ${batch.lastOffset}: processing [${index +
1}/${batch.records.size}]:" +
- s" ${messageAndVersion.message}")
- }
- _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() +
index)
- try {
- delta.replay(messageAndVersion.message())
- } catch {
- case e: Throwable => snapshotName match {
- case None => metadataLoadingFaultHandler.handleFault(
- s"Error replaying metadata log record at offset
${_highestOffset}", e)
- case Some(name) => metadataLoadingFaultHandler.handleFault(
- s"Error replaying record ${index} from snapshot ${name} at
offset ${_highestOffset}", e)
- }
- } finally {
- numRecords += 1
- index += 1
- }
- }
- numBytes = numBytes + batch.sizeInBytes()
- brokerMetrics.updateBatchSize(batch.records().size())
- numBatches = numBatches + 1
- }
-
- val endTimeNs = time.nanoseconds()
- val elapsedNs = endTimeNs - startTimeNs
- brokerMetrics.updateBatchProcessingTime(elapsedNs)
- BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs),
numBytes)
- }
-
- def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] =
{
- val event = new StartPublishingEvent(publisher)
- eventQueue.append(event)
- event.future
- }
-
- class StartPublishingEvent(publisher: MetadataPublisher)
- extends EventQueue.FailureLoggingEvent(log) {
- val future = new CompletableFuture[Void]()
-
- override def run(): Unit = {
- _publisher = Some(publisher)
- log.info(s"Starting to publish metadata events at offset
$highestMetadataOffset.")
- try {
- // Generate a snapshot if the metadata version changed
- metadataVersionChanged.foreach(reason =>
maybeStartSnapshot(Set(reason)))
- publish(publisher)
- future.complete(null)
- } catch {
- case e: Throwable =>
- future.completeExceptionally(e)
- throw e
- }
- }
- }
-
- // This is used in tests to alter the publisher that is in use by the broker.
- def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
- val event = new AlterPublisherEvent(publisher)
- eventQueue.append(event)
- event.future
- }
-
- class AlterPublisherEvent(publisher: MetadataPublisher)
- extends EventQueue.FailureLoggingEvent(log) {
- val future = new CompletableFuture[Void]()
-
- override def run(): Unit = {
- _publisher = Some(publisher)
- log.info(s"Set publisher to ${publisher}")
- future.complete(null)
- }
- }
-
- private def publish(publisher: MetadataPublisher): Unit = {
- val delta = _delta
- try {
- _image = _delta.apply(provenance())
- } catch {
- case t: Throwable =>
- // If we cannot apply the delta, this publish event will throw and we
will not publish a new image.
- // The broker will continue applying metadata records and attempt to
publish again.
- throw metadataLoadingFaultHandler.handleFault(s"Error applying
metadata delta $delta", t)
- }
-
- _delta = new MetadataDelta(_image)
- if (isDebugEnabled) {
- debug(s"Publishing new metadata delta $delta at offset
${_image.highestOffsetAndEpoch().offset}.")
- }
-
- // This publish call is done with its own try-catch and fault handler
- publisher.publish(delta, _image)
-
- // Update the metrics since the publisher handled the lastest image
- brokerMetrics.updateLastAppliedImageProvenance(_image.provenance())
- }
-
- override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
- // Nothing to do.
- }
-
- override def beginShutdown(): Unit = {
- eventQueue.beginShutdown("beginShutdown")
- }
-
- def close(): Unit = {
- beginShutdown()
- eventQueue.close()
- }
-
- // VisibleForTesting
- private[kafka] def getImageRecords():
CompletableFuture[util.List[ApiMessageAndVersion]] = {
- val future = new CompletableFuture[util.List[ApiMessageAndVersion]]()
- eventQueue.append(new GetImageRecordsEvent(future))
- future
- }
-
- class GetImageRecordsEvent(future:
CompletableFuture[util.List[ApiMessageAndVersion]])
- extends EventQueue.FailureLoggingEvent(log) {
- override def run(): Unit = {
- val writer = new RecordListWriter()
- val options = new ImageWriterOptions.Builder().
- setMetadataVersion(_image.features().metadataVersion()).
- build()
- try {
- _image.write(writer, options)
- } finally {
- writer.close()
- }
- future.complete(writer.records())
- }
- }
-}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 4ad9f15beb8..e1bf2e89607 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -18,19 +18,22 @@
package kafka.server.metadata
import java.util.{OptionalInt, Properties}
-import java.util.concurrent.atomic.AtomicLong
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta,
TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.fault.FaultHandler
+import java.util.concurrent.CompletableFuture
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -123,11 +126,17 @@ class BrokerMetadataPublisher(
var _firstPublish = true
/**
- * This is updated after all components (e.g. LogManager) has finished
publishing the new metadata delta
+ * A future that is completed when we first publish.
*/
- val publishedOffsetAtomic = new AtomicLong(-1)
+ val firstPublishFuture = new CompletableFuture[Void]
- override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+ override def name(): String = "BrokerMetadataPublisher"
+
+ override def onMetadataUpdate(
+ delta: MetadataDelta,
+ newImage: MetadataImage,
+ manifest: LoaderManifest
+ ): Unit = {
val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
val deltaName = if (_firstPublish) {
@@ -269,12 +278,12 @@ class BrokerMetadataPublisher(
if (_firstPublish) {
finishInitializingReplicaManager(newImage)
}
- publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
s"publishing broker metadata from $deltaName", t)
} finally {
_firstPublish = false
+ firstPublishFuture.complete(null)
}
}
@@ -285,8 +294,6 @@ class BrokerMetadataPublisher(
}
}
- override def publishedOffset: Long = publishedOffsetAtomic.get()
-
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
}
@@ -392,5 +399,9 @@ class BrokerMetadataPublisher(
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error
starting high " +
"watermark checkpoint thread during startup", t)
}
-}
+ }
+
+ override def close(): Unit = {
+ firstPublishFuture.completeExceptionally(new TimeoutException())
+ }
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
deleted file mode 100644
index b8828722559..00000000000
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.metadata
-
-import java.util.concurrent.RejectedExecutionException
-import kafka.utils.Logging
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.image.MetadataImage
-import org.apache.kafka.image.writer.{ImageWriterOptions, RaftSnapshotWriter}
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.snapshot.SnapshotWriter
-import scala.jdk.CollectionConverters._
-
-trait SnapshotWriterBuilder {
- def build(committedOffset: Long,
- committedEpoch: Int,
- lastContainedLogTime: Long):
Option[SnapshotWriter[ApiMessageAndVersion]]
-}
-
-class BrokerMetadataSnapshotter(
- brokerId: Int,
- val time: Time,
- threadNamePrefix: Option[String],
- writerBuilder: SnapshotWriterBuilder
-) extends Logging with MetadataSnapshotter {
- /**
- * The maximum number of records we will put in each batch.
- *
- * From the perspective of the Raft layer, the limit on batch size is
specified in terms of
- * bytes, not number of records. @See {@link
KafkaRaftClient#MAX_BATCH_SIZE_BYTES} for details.
- * However, it's more convenient to limit the batch size here in terms of
number of records.
- * So we chose a low number that will not cause problems.
- */
- private val maxRecordsInBatch = 1024
-
- private val logContext = new LogContext(s"[BrokerMetadataSnapshotter
id=$brokerId] ")
- logIdent = logContext.logPrefix()
-
- /**
- * The offset of the snapshot in progress, or -1 if there isn't one.
Accessed only under
- * the object lock.
- */
- private var _currentSnapshotOffset = -1L
-
- /**
- * The event queue which runs this listener.
- */
- val eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix.getOrElse(""), new ShutdownEvent())
-
- override def maybeStartSnapshot(
- lastContainedLogTime: Long,
- image: MetadataImage,
- snapshotReasons: Set[SnapshotReason]
- ): Boolean = synchronized {
- if (_currentSnapshotOffset != -1) {
- info(s"Declining to create a new snapshot at
${image.highestOffsetAndEpoch} because " +
- s"there is already a snapshot in progress at offset
${_currentSnapshotOffset}")
- false
- } else {
- val writer = writerBuilder.build(
- image.highestOffsetAndEpoch().offset,
- image.highestOffsetAndEpoch().epoch,
- lastContainedLogTime
- )
- if (writer.nonEmpty) {
- _currentSnapshotOffset = image.highestOffsetAndEpoch.offset
-
- val snapshotReasonsMessage =
SnapshotReason.stringFromReasons(snapshotReasons.asJava)
- info(s"Creating a new snapshot at ${image.highestOffsetAndEpoch}
because: $snapshotReasonsMessage")
- eventQueue.append(new CreateSnapshotEvent(image, writer.get))
- true
- } else {
- info(s"Declining to create a new snapshot at
${image.highestOffsetAndEpoch()} because " +
- s"there is already a snapshot at offset
${image.highestOffsetAndEpoch().offset}")
- false
- }
- }
- }
-
- class CreateSnapshotEvent(
- image: MetadataImage,
- snapshotWriter: SnapshotWriter[ApiMessageAndVersion]
- ) extends EventQueue.Event {
-
- override def run(): Unit = {
- val writer = new RaftSnapshotWriter(snapshotWriter, maxRecordsInBatch)
- val options = new ImageWriterOptions.Builder().
- setMetadataVersion(image.features().metadataVersion()).
- build()
- try {
- image.write(writer, options)
- } finally {
- try {
- writer.close()
- } finally {
- BrokerMetadataSnapshotter.this.synchronized {
- _currentSnapshotOffset = -1L
- }
- }
- }
- }
-
- override def handleException(e: Throwable): Unit = {
- e match {
- case _: RejectedExecutionException =>
- info("Not processing CreateSnapshotEvent because the event queue is
closed.")
- case _ => error("Unexpected error handling CreateSnapshotEvent", e)
- }
- }
- }
-
- def beginShutdown(): Unit = {
- eventQueue.beginShutdown("beginShutdown");
- }
-
- class ShutdownEvent() extends EventQueue.Event {
- override def run(): Unit = {
- }
- }
-
- def close(): Unit = {
- beginShutdown()
- eventQueue.close()
- }
-}
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
deleted file mode 100644
index b63a2c056c0..00000000000
--- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.metadata
-
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-
-/**
- * An object which publishes a new metadata image.
- */
-trait MetadataPublisher {
- /**
- * Publish a new metadata image.
- *
- * @param delta The delta between the old image and the new
one.
- * @param newImage The new image, which is the result of
applying the
- * delta to the previous image.
- */
- def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
-
- /**
- * The highest offset of metadata topic which has been published
- */
- def publishedOffset: Long
-}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
deleted file mode 100644
index e559a6b753d..00000000000
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ /dev/null
@@ -1,413 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util
-import java.util.concurrent.atomic.AtomicReference
-import java.util.{Collections, Optional}
-import org.apache.kafka.common.metadata.{FeatureLevelRecord,
PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{Endpoint, Uuid}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils,
VersionRange}
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import scala.jdk.CollectionConverters._
-
-class BrokerMetadataListenerTest {
- private val metadataLoadingFaultHandler = new MockFaultHandler("metadata
loading")
-
- @AfterEach
- def verifyNoFaults(): Unit = {
- metadataLoadingFaultHandler.maybeRethrowFirstException()
- }
-
- private def newBrokerMetadataListener(
- metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
- snapshotter: Option[MetadataSnapshotter] = None,
- maxBytesBetweenSnapshots: Long = 1000000L,
- faultHandler: FaultHandler = metadataLoadingFaultHandler
- ): BrokerMetadataListener = {
- new BrokerMetadataListener(
- brokerId = 0,
- time = Time.SYSTEM,
- threadNamePrefix = None,
- maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
- snapshotter = snapshotter,
- brokerMetrics = metrics,
- _metadataLoadingFaultHandler = faultHandler
- )
- }
-
- @Test
- def testCreateAndClose(): Unit = {
- val listener = newBrokerMetadataListener()
- listener.close()
- }
-
- @Test
- def testPublish(): Unit = {
- val metrics = BrokerServerMetrics(new Metrics())
- val listener = newBrokerMetadataListener(metrics = metrics)
- try {
- val unfencedTimestamp = 300L
- listener.handleCommit(
- RecordTestUtils.mockBatchReader(
- 100,
- unfencedTimestamp,
- util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
- setBrokerId(0).
- setBrokerEpoch(100L).
- setFenced(false).
- setRack(null).
- setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")),
0.toShort))
- )
- )
- val imageRecords = listener.getImageRecords().get()
- assertEquals(0, imageRecords.size())
- assertEquals(100L, listener.highestMetadataOffset)
- assertEquals(-1L, metrics.lastAppliedOffset())
- assertEquals(-1L, metrics.lastAppliedTimestamp())
- assertEquals(0L, metrics.metadataLoadErrorCount.get)
- assertEquals(0L, metrics.metadataApplyErrorCount.get)
-
- val fencedTimestamp = 500L
- val fencedLastOffset = 200L
- listener.handleCommit(
- RecordTestUtils.mockBatchReader(
- fencedLastOffset,
- fencedTimestamp,
- util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
- setBrokerId(1).
- setBrokerEpoch(200L).
- setFenced(true).
- setRack(null).
- setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")),
0.toShort))
- )
- )
- listener.startPublishing(new MetadataPublisher {
- override def publish(delta: MetadataDelta, newImage: MetadataImage):
Unit = {
- assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
- assertEquals(new BrokerRegistration(0, 100L,
- Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg"),
Collections.emptyList[Endpoint](),
- Collections.emptyMap[String, VersionRange](),
Optional.empty[String](), false, false),
- delta.clusterDelta().broker(0))
- assertEquals(new BrokerRegistration(1, 200L,
- Uuid.fromString("QkOQtNKVTYatADcaJ28xDg"),
Collections.emptyList[Endpoint](),
- Collections.emptyMap[String, VersionRange](),
Optional.empty[String](), true, false),
- delta.clusterDelta().broker(1))
- }
-
- override def publishedOffset: Long = -1
- }).get()
-
- assertEquals(fencedLastOffset, metrics.lastAppliedOffset())
- assertEquals(fencedTimestamp, metrics.lastAppliedTimestamp())
- assertEquals(0L, metrics.metadataLoadErrorCount.get)
- assertEquals(0L, metrics.metadataApplyErrorCount.get)
- } finally {
- listener.close()
- }
- }
-
- class MockMetadataSnapshotter extends MetadataSnapshotter {
- var image = MetadataImage.EMPTY
- val failure = new AtomicReference[Throwable](null)
- var activeSnapshotOffset = -1L
- var prevCommittedOffset = -1L
- var prevCommittedEpoch = -1
- var prevLastContainedLogTime = -1L
-
- override def maybeStartSnapshot(lastContainedLogTime: Long, newImage:
MetadataImage, reason: Set[SnapshotReason]): Boolean = {
- try {
- if (activeSnapshotOffset == -1L) {
- assertTrue(prevCommittedOffset <=
newImage.highestOffsetAndEpoch().offset)
- assertTrue(prevCommittedEpoch <=
newImage.highestOffsetAndEpoch().epoch)
- assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
- prevCommittedOffset = newImage.highestOffsetAndEpoch().offset
- prevCommittedEpoch = newImage.highestOffsetAndEpoch().epoch
- prevLastContainedLogTime = lastContainedLogTime
- image = newImage
- activeSnapshotOffset = newImage.highestOffsetAndEpoch().offset
- true
- } else {
- false
- }
- } catch {
- case t: Throwable => failure.compareAndSet(null, t)
- }
- }
- }
-
- class MockMetadataPublisher extends MetadataPublisher {
- var image = MetadataImage.EMPTY
-
- override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
= {
- image = newImage
- }
-
- override def publishedOffset: Long = -1
- }
-
- private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
- private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
-
- private def generateManyRecords(listener: BrokerMetadataListener,
- endOffset: Long): Unit = {
- (0 to 10000).foreach { _ =>
- listener.handleCommit(
- RecordTestUtils.mockBatchReader(
- endOffset,
- 0,
- util.Arrays.asList(
- new ApiMessageAndVersion(new PartitionChangeRecord().
- setPartitionId(0).
- setTopicId(FOO_ID).
- setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
- new ApiMessageAndVersion(new PartitionChangeRecord().
- setPartitionId(0).
- setTopicId(FOO_ID).
- setRemovingReplicas(Collections.emptyList()), 0.toShort)
- )
- )
- )
- }
- listener.getImageRecords().get()
- }
-
- private def generateBadRecords(listener: BrokerMetadataListener,
- endOffset: Long): Unit = {
- listener.handleCommit(
- RecordTestUtils.mockBatchReader(
- endOffset,
- 0,
- util.Arrays.asList(
- new ApiMessageAndVersion(new PartitionChangeRecord().
- setPartitionId(0).
- setTopicId(BAR_ID).
- setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
- new ApiMessageAndVersion(new PartitionChangeRecord().
- setPartitionId(0).
- setTopicId(BAR_ID).
- setRemovingReplicas(Collections.emptyList()), 0.toShort)
- )
- )
- )
- listener.getImageRecords().get()
- }
-
- @Test
- def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
- val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L)
- try {
- val brokerIds = 0 to 3
-
- registerBrokers(listener, brokerIds, endOffset = 100L)
- createTopicWithOnePartition(listener, replicas = brokerIds, endOffset =
200L)
- listener.getImageRecords().get()
- assertEquals(200L, listener.highestMetadataOffset)
-
- generateManyRecords(listener, endOffset = 1000L)
- assertEquals(1000L, listener.highestMetadataOffset)
- } finally {
- listener.close()
- }
- }
-
- @Test
- def testCreateSnapshot(): Unit = {
- val snapshotter = new MockMetadataSnapshotter()
- val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
- maxBytesBetweenSnapshots = 1000L)
- try {
- val brokerIds = 0 to 3
-
- registerBrokers(listener, brokerIds, endOffset = 100L)
- createTopicWithOnePartition(listener, replicas = brokerIds, endOffset =
200L)
- listener.getImageRecords().get()
- assertEquals(200L, listener.highestMetadataOffset)
-
- // Check that we generate at least one snapshot once we see enough
records.
- assertEquals(-1L, snapshotter.prevCommittedOffset)
- generateManyRecords(listener, 1000L)
- assertEquals(1000L, snapshotter.prevCommittedOffset)
- assertEquals(1000L, snapshotter.activeSnapshotOffset)
- snapshotter.activeSnapshotOffset = -1L
-
- // Test creating a new snapshot after publishing it.
- val publisher = new MockMetadataPublisher()
- listener.startPublishing(publisher).get()
- generateManyRecords(listener, 2000L)
- listener.getImageRecords().get()
- assertEquals(2000L, snapshotter.activeSnapshotOffset)
- assertEquals(2000L, snapshotter.prevCommittedOffset)
-
- // Test how we handle the snapshotter returning false.
- generateManyRecords(listener, 3000L)
- assertEquals(2000L, snapshotter.activeSnapshotOffset)
- generateManyRecords(listener, 4000L)
- assertEquals(2000L, snapshotter.activeSnapshotOffset)
- snapshotter.activeSnapshotOffset = -1L
- generateManyRecords(listener, 5000L)
- assertEquals(5000L, snapshotter.activeSnapshotOffset)
- assertEquals(null, snapshotter.failure.get())
- } finally {
- listener.close()
- }
- }
-
- @Test
- def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = {
- val snapshotter = new MockMetadataSnapshotter()
- val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
- maxBytesBetweenSnapshots = 1000L)
-
- updateFeature(listener, feature = MetadataVersion.FEATURE_NAME,
MetadataVersion.latest.featureLevel(), 100L)
- listener.getImageRecords().get()
- assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate
snapshot on metadata version change before starting publishing")
- }
-
- @Test
- def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = {
- val snapshotter = new MockMetadataSnapshotter()
- val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
- maxBytesBetweenSnapshots = 1000L)
-
- val endOffset = 100L
- updateFeature(listener, feature = MetadataVersion.FEATURE_NAME,
MetadataVersion.latest.featureLevel(), endOffset)
- listener.startPublishing(new MockMetadataPublisher()).get()
- assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should try
to generate snapshot when starting publishing")
- }
-
- @Test
- def testSnapshotAfterMetadataVersionChange(): Unit = {
- val snapshotter = new MockMetadataSnapshotter()
- val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
- maxBytesBetweenSnapshots = 1000L)
- listener.startPublishing(new MockMetadataPublisher()).get()
-
- val endOffset = 100L
- updateFeature(listener, feature = MetadataVersion.FEATURE_NAME,
(MetadataVersion.latest().featureLevel() - 1).toShort, endOffset)
- // Waiting for the metadata version update to get processed
- listener.getImageRecords().get()
- assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should
generate snapshot on feature update")
- }
-
- @Test
- def testNoSnapshotAfterError(): Unit = {
- val snapshotter = new MockMetadataSnapshotter()
- val faultHandler = new MockFaultHandler("metadata loading")
-
- val listener = newBrokerMetadataListener(
- snapshotter = Some(snapshotter),
- maxBytesBetweenSnapshots = 1000L,
- faultHandler = faultHandler)
- try {
- val brokerIds = 0 to 3
-
- registerBrokers(listener, brokerIds, endOffset = 100L)
- createTopicWithOnePartition(listener, replicas = brokerIds, endOffset =
200L)
- listener.getImageRecords().get()
- assertEquals(200L, listener.highestMetadataOffset)
- assertEquals(-1L, snapshotter.prevCommittedOffset)
- assertEquals(-1L, snapshotter.activeSnapshotOffset)
-
- // Append invalid records that will normally trigger a snapshot
- generateBadRecords(listener, 1000L)
- assertEquals(-1L, snapshotter.prevCommittedOffset)
- assertEquals(-1L, snapshotter.activeSnapshotOffset)
-
- // Generate some records that will not throw an error, verify still no
snapshots
- generateManyRecords(listener, 2000L)
- assertEquals(-1L, snapshotter.prevCommittedOffset)
- assertEquals(-1L, snapshotter.activeSnapshotOffset)
- } finally {
- listener.close()
- }
- }
-
- private def registerBrokers(
- listener: BrokerMetadataListener,
- brokerIds: Iterable[Int],
- endOffset: Long
- ): Unit = {
- brokerIds.foreach { brokerId =>
- listener.handleCommit(
- RecordTestUtils.mockBatchReader(
- endOffset,
- 0,
- util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
- setBrokerId(brokerId).
- setBrokerEpoch(100L).
- setFenced(false).
- setRack(null).
- setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" +
brokerId)), 0.toShort))
- )
- )
- }
- }
-
- private def createTopicWithOnePartition(
- listener: BrokerMetadataListener,
- replicas: Seq[Int],
- endOffset: Long
- ): Unit = {
- listener.handleCommit(
- RecordTestUtils.mockBatchReader(
- endOffset,
- 0,
- util.Arrays.asList(
- new ApiMessageAndVersion(new TopicRecord().
- setName("foo").
- setTopicId(FOO_ID), 0.toShort),
- new ApiMessageAndVersion(new PartitionRecord().
- setPartitionId(0).
- setTopicId(FOO_ID).
- setIsr(replicas.map(Int.box).asJava).
- setLeader(0).
- setReplicas(replicas.map(Int.box).asJava), 0.toShort)
- )
- )
- )
- }
-
- private def updateFeature(
- listener: BrokerMetadataListener,
- feature: String,
- version: Short,
- endOffset: Long
- ): Unit = {
- listener.handleCommit(
- RecordTestUtils.mockBatchReader(
- endOffset,
- 0,
- util.Arrays.asList(
- new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(feature).
- setFeatureLevel(version), 0.toShort)
- )
- )
- )
- }
-
-}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index d4cdda431ed..40513bba270 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -33,9 +33,11 @@ import
org.apache.kafka.common.config.ConfigResource.Type.BROKER
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupCoordinator
-import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataImageTest, TopicImage, TopicsImage}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataImageTest, MetadataProvenance, TopicImage, TopicsImage}
+import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.fault.FaultHandler
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -45,6 +47,7 @@ import org.mockito.Mockito.{doThrow, mock, verify}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
+import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._
class BrokerMetadataPublisherTest {
@@ -208,7 +211,7 @@ class BrokerMetadataPublisherTest {
thenAnswer(new Answer[Unit]() {
override def answer(invocation: InvocationOnMock): Unit =
numTimesReloadCalled.addAndGet(1)
})
- broker.metadataPublisher.dynamicConfigPublisher = publisher
+ broker.brokerMetadataPublisher.dynamicConfigPublisher = publisher
val admin = Admin.create(cluster.clientProperties())
try {
assertEquals(0, numTimesReloadCalled.get())
@@ -246,11 +249,12 @@ class BrokerMetadataPublisherTest {
cluster.waitForReadyBrokers()
val broker = cluster.brokers().values().iterator().next()
TestUtils.retry(60000) {
- assertNotNull(broker.metadataPublisher)
+ assertNotNull(broker.brokerMetadataPublisher)
}
- val publisher = Mockito.spy(broker.metadataPublisher)
+ val publisher = Mockito.spy(broker.brokerMetadataPublisher)
doThrow(new RuntimeException("injected
failure")).when(publisher).updateCoordinator(any(), any(), any(), any(), any())
- broker.metadataListener.alterPublisher(publisher).get()
+
broker.sharedServer.loader.removeAndClosePublisher(broker.brokerMetadataPublisher).get(1,
TimeUnit.MINUTES)
+
broker.sharedServer.loader.installPublishers(List(publisher).asJava).get(1,
TimeUnit.MINUTES)
val admin = Admin.create(cluster.clientProperties())
try {
admin.createTopics(singletonList(new NewTopic("foo", 1,
1.toShort))).all().get()
@@ -296,7 +300,8 @@ class BrokerMetadataPublisherTest {
.setImage(image)
.build()
- metadataPublisher.publish(delta, image)
+ metadataPublisher.onMetadataUpdate(delta, image,
+ new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN,
1, 100, 42));
verify(groupCoordinator).onNewMetadataImage(image, delta)
}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
deleted file mode 100644
index c086c3dd8b2..00000000000
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.nio.ByteBuffer
-import java.util.Optional
-import java.util.concurrent.{CompletableFuture, CountDownLatch}
-import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.protocol.ByteBufferAccessor
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataImageTest, MetadataProvenance}
-import org.apache.kafka.metadata.MetadataRecordSerde
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.queue.EventQueue
-import org.apache.kafka.raft.OffsetAndEpoch
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.snapshot.{MockRawSnapshotWriter,
RecordsSnapshotWriter, SnapshotWriter}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.Timeout
-import org.junit.jupiter.api.Test
-
-import java.util
-import scala.compat.java8.OptionConverters._
-
-class BrokerMetadataSnapshotterTest {
- @Test
- def testCreateAndClose(): Unit = {
- val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
- (_, _, _) => throw new RuntimeException("unimplemented"))
- snapshotter.close()
- }
-
- class MockSnapshotWriterBuilder extends SnapshotWriterBuilder {
- var image = new CompletableFuture[MetadataImage]
-
- override def build(committedOffset: Long,
- committedEpoch: Int,
- lastContainedLogTime: Long):
Option[SnapshotWriter[ApiMessageAndVersion]] = {
- val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
- RecordsSnapshotWriter.createWithHeader(
- () => {
- Optional.of(
- new MockRawSnapshotWriter(offsetAndEpoch,
consumeSnapshotBuffer(committedOffset, committedEpoch, lastContainedLogTime))
- )
- },
- 4096,
- MemoryPool.NONE,
- Time.SYSTEM,
- lastContainedLogTime,
- CompressionType.NONE,
- MetadataRecordSerde.INSTANCE
- ).asScala
- }
-
- def consumeSnapshotBuffer(
- committedOffset: Long,
- committedEpoch: Int,
- lastContainedLogTime: Long
- )(buffer: ByteBuffer): Unit = {
- val delta = new MetadataDelta(MetadataImage.EMPTY)
- val memoryRecords = MemoryRecords.readableRecords(buffer)
- val batchIterator = memoryRecords.batchIterator()
- while (batchIterator.hasNext) {
- val batch = batchIterator.next()
- if (!batch.isControlBatch()) {
- batch.forEach(record => {
- val recordBuffer = record.value().duplicate()
- val messageAndVersion = MetadataRecordSerde.INSTANCE.read(
- new ByteBufferAccessor(recordBuffer), recordBuffer.remaining())
- delta.replay(messageAndVersion.message())
- })
- }
- }
- image.complete(delta.apply(new MetadataProvenance(committedOffset,
committedEpoch, lastContainedLogTime)))
- }
- }
-
- class BlockingEvent extends EventQueue.Event {
- val latch = new CountDownLatch(1)
- override def run(): Unit = latch.await()
- }
-
- @Test
- @Timeout(30)
- def testCreateSnapshot(): Unit = {
- val writerBuilder = new MockSnapshotWriterBuilder()
- val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
writerBuilder)
-
- try {
- val blockingEvent = new BlockingEvent()
- val reasons = Set(SnapshotReason.UNKNOWN)
-
- snapshotter.eventQueue.append(blockingEvent)
- assertTrue(snapshotter.maybeStartSnapshot(2000L,
MetadataImageTest.IMAGE1, reasons))
- assertFalse(snapshotter.maybeStartSnapshot(4000L,
MetadataImageTest.IMAGE2, reasons))
- blockingEvent.latch.countDown()
- assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
- } finally {
- snapshotter.close()
- }
- }
-
- class MockSnapshotWriter extends SnapshotWriter[ApiMessageAndVersion] {
- val batches = new util.ArrayList[util.List[ApiMessageAndVersion]]
- override def snapshotId(): OffsetAndEpoch = new OffsetAndEpoch(0, 0)
- override def lastContainedLogOffset(): Long = 0
- override def lastContainedLogEpoch(): Int = 0
- override def isFrozen: Boolean = false
- override def append(batch: util.List[ApiMessageAndVersion]): Unit =
batches.add(batch)
- override def freeze(): Unit = {}
- override def close(): Unit = {}
- }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index fd463420d17..e0b65b90cff 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1244,7 +1244,7 @@ object TestUtils extends Logging {
TestUtils.waitUntilTrue(
() => {
brokers.forall { broker =>
- val metadataOffset =
broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
+ val metadataOffset =
broker.asInstanceOf[BrokerServer].sharedServer.loader.lastAppliedOffset()
metadataOffset >= controllerOffset
}
}, msg)