This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5d597b0fc81 MINOR: Replace BrokerMetadataListener with MetadataLoader 
(#13344)
5d597b0fc81 is described below

commit 5d597b0fc81a1e77905d57666069b336eba8805c
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Mar 14 10:24:37 2023 -0700

    MINOR: Replace BrokerMetadataListener with MetadataLoader (#13344)
    
    Replace BrokerMetadataListener with MetadataLoader. BrokerMetadataListener 
was, in a sense, an
    early prototype of MetadataLoader. It constructed a MetadataImage object 
from records supplied by
    the Raft layer.
    
    MetadataLoader is a rewrite of this code with several improvements.
    
    - It is in Java, and not part of the core gradle module.
    
    - It is not broker-specific. This is especially useful in combined mode 
where it means we do not
      have to have a separate, controller-specific code path for creating 
MetadataImage objects.
    
    - It supports multiple publishers rather than a single publisher. The 
publishers can be dynamically
      added and removed.
    
    This PR also removes BrokerMetadataSnapshotter in favor of the code path 
which the controller is
    already using, org.apache.kafka.image.publisher.SnapshotGenerator. This 
means we only have to
    maintain one snapshotter, rather than two.
    
    Reviewers: David Arthur <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala | 115 +++---
 .../main/scala/kafka/server/ControllerServer.scala |   2 +-
 .../src/main/scala/kafka/server/SharedServer.scala |  60 ++-
 .../server/metadata/BrokerMetadataListener.scala   | 395 --------------------
 .../server/metadata/BrokerMetadataPublisher.scala  |  27 +-
 .../metadata/BrokerMetadataSnapshotter.scala       | 141 -------
 .../kafka/server/metadata/MetadataPublisher.scala  |  38 --
 .../metadata/BrokerMetadataListenerTest.scala      | 413 ---------------------
 .../metadata/BrokerMetadataPublisherTest.scala     |  17 +-
 .../metadata/BrokerMetadataSnapshotterTest.scala   | 129 -------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   2 +-
 11 files changed, 105 insertions(+), 1234 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 6b1bc710f39..b73eaa12fee 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -25,8 +25,7 @@ import kafka.log.remote.RemoteLogManager
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
-import kafka.server.KafkaRaftServer.ControllerRole
-import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, 
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, 
DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, 
ScramPublisher, SnapshotWriterBuilder}
+import kafka.server.metadata.{BrokerMetadataPublisher, 
ClientQuotaMetadataManager, DynamicClientQuotaPublisher, 
DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
 import kafka.utils.CoreUtils
 import org.apache.kafka.common.feature.SupportedVersionRange
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,16 +37,15 @@ import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
 import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.raft
-import org.apache.kafka.raft.{RaftClient, RaftConfig}
+import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.snapshot.SnapshotWriter
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 
 import java.net.InetAddress
@@ -56,20 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, 
TimeoutException}
 import scala.collection.{Map, Seq}
-import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 
-class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
-    extends SnapshotWriterBuilder {
-  override def build(committedOffset: Long,
-                     committedEpoch: Int,
-                     lastContainedLogTime: Long): 
Option[SnapshotWriter[ApiMessageAndVersion]] = {
-    val snapshotId = new raft.OffsetAndEpoch(committedOffset + 1, 
committedEpoch)
-    raftClient.createSnapshot(snapshotId, lastContainedLogTime).asScala
-  }
-}
-
 /**
  * A Kafka broker that runs in KRaft (Kafka Raft) mode.
  */
@@ -146,11 +133,7 @@ class BrokerServer(
 
   val clusterId: String = sharedServer.metaProps.clusterId
 
-  var metadataSnapshotter: Option[BrokerMetadataSnapshotter] = None
-
-  var metadataListener: BrokerMetadataListener = _
-
-  var metadataPublisher: BrokerMetadataPublisher = _
+  var brokerMetadataPublisher: BrokerMetadataPublisher = _
 
   val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
 
@@ -317,25 +300,6 @@ class BrokerServer(
         ConfigType.Topic -> new TopicConfigHandler(logManager, config, 
quotaManagers, None),
         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
-      if (!config.processRoles.contains(ControllerRole)) {
-        // If no controller is defined, we rely on the broker to generate 
snapshots.
-        metadataSnapshotter = Some(new BrokerMetadataSnapshotter(
-          config.nodeId,
-          time,
-          threadNamePrefix,
-          new BrokerSnapshotWriterBuilder(raftManager.client)
-        ))
-      }
-
-      metadataListener = new BrokerMetadataListener(
-        config.nodeId,
-        time,
-        threadNamePrefix,
-        config.metadataSnapshotMaxNewRecordBytes,
-        metadataSnapshotter,
-        sharedServer.brokerMetrics,
-        sharedServer.metadataLoaderFaultHandler)
-
       val networkListeners = new ListenerCollection()
       config.effectiveAdvertisedListeners.foreach { ep =>
         networkListeners.add(new Listener().
@@ -360,15 +324,17 @@ class BrokerServer(
         config.brokerSessionTimeoutMs / 2 // KAFKA-14392
       )
       lifecycleManager.start(
-        () => metadataListener.highestMetadataOffset,
+        () => sharedServer.loader.lastAppliedOffset(),
         brokerLifecycleChannelManager,
         sharedServer.metaProps.clusterId,
         networkListeners,
         featuresRemapped
       )
-
-      // Register a listener with the Raft layer to receive metadata event 
notifications
-      raftManager.register(metadataListener)
+      // If the BrokerLifecycleManager's initial catch-up future fails, it 
means we timed out
+      // or are shutting down before we could catch up. Therefore, also fail 
the firstPublishFuture.
+      lifecycleManager.initialCatchUpFuture.whenComplete((_, e) => {
+        if (e != null) 
brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
+      })
 
       val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
       var interBrokerListener: Endpoint = null
@@ -438,12 +404,8 @@ class BrokerServer(
         config.numIoThreads, 
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
         DataPlaneAcceptor.ThreadPrefix)
 
-      FutureUtils.waitWithLogging(logger.underlying, logIdent,
-        "broker metadata to catch up",
-        lifecycleManager.initialCatchUpFuture, startupDeadline, time)
-
-      // Apply the metadata log changes that we've accumulated.
-      metadataPublisher = new BrokerMetadataPublisher(config,
+      val publishers = new util.ArrayList[MetadataPublisher]()
+      brokerMetadataPublisher = new BrokerMetadataPublisher(config,
         metadataCache,
         logManager,
         replicaManager,
@@ -467,28 +429,42 @@ class BrokerServer(
         authorizer,
         sharedServer.initialBrokerMetadataLoadFaultHandler,
         sharedServer.metadataPublishingFaultHandler)
+      publishers.add(brokerMetadataPublisher)
 
-      // Add all reconfigurables for config change notification before 
installing the metadata publisher.
+      // Register parts of the broker that can be reconfigured via dynamic 
configs.  This needs to
+      // be done before we publish the dynamic configs, so that we don't miss 
anything.
       config.dynamicConfig.addReconfigurables(this)
 
-      // Tell the metadata listener to start publishing its output, and wait 
for the first
-      // publish operation to complete. This first operation will initialize 
logManager,
-      // replicaManager, groupCoordinator, and txnCoordinator. The log manager 
may perform
-      // a potentially lengthy recovery-from-unclean-shutdown operation here, 
if required.
+      // Install all the metadata publishers.
+      FutureUtils.waitWithLogging(logger.underlying, logIdent,
+        "the broker metadata publishers to be installed",
+        sharedServer.loader.installPublishers(publishers), startupDeadline, 
time)
+
+      // Wait for this broker to contact the quorum, and for the active 
controller to acknowledge
+      // us as caught up. It will do this by returning a heartbeat response 
with isCaughtUp set to
+      // true. The BrokerLifecycleManager tracks this.
       FutureUtils.waitWithLogging(logger.underlying, logIdent,
-        "the broker to catch up with the current cluster metadata",
-        metadataListener.startPublishing(metadataPublisher), startupDeadline, 
time)
+        "the controller to acknowledge that we are caught up",
+        lifecycleManager.initialCatchUpFuture, startupDeadline, time)
 
-      // Log static broker configurations.
+      // Wait for the first metadata update to be published. Metadata updates 
are not published
+      // until we read at least up to the high water mark of the cluster 
metadata partition.
+      // Usually, we publish the initial metadata before 
lifecycleManager.initialCatchUpFuture
+      // is completed, so this check is not necessary. But this is a simple 
check to make
+      // completely sure.
+      FutureUtils.waitWithLogging(logger.underlying, logIdent,
+        "the initial broker metadata update to be published",
+        brokerMetadataPublisher.firstPublishFuture , startupDeadline, time)
+
+      // Now that we have loaded some metadata, we can log a reasonably 
up-to-date broker
+      // configuration.  Keep in mind that KafkaConfig.originals is a mutable 
field that gets set
+      // by the dynamic configuration publisher. Ironically, 
KafkaConfig.originals does not
+      // contain the original configuration values.
       new KafkaConfig(config.originals(), true)
 
       // Start RemoteLogManager before broker start serving the requests.
       remoteLogManager.foreach(_.startup())
 
-      // Enable inbound TCP connections. Each endpoint will be started only 
once its matching
-      // authorizer future is completed.
-      val socketServerFuture = 
socketServer.enableRequestProcessing(authorizerFutures)
-
       // If we are using a ClusterMetadataAuthorizer which stores its ACLs in 
the metadata log,
       // notify it that the loading process is complete.
       authorizer match {
@@ -503,7 +479,11 @@ class BrokerServer(
         "the broker to be unfenced",
         lifecycleManager.setReadyToUnfence(), startupDeadline, time)
 
-      // Block here until all the authorizer futures are complete
+      // Enable inbound TCP connections. Each endpoint will be started only 
once its matching
+      // authorizer future is completed.
+      val enableRequestProcessingFuture = 
socketServer.enableRequestProcessing(authorizerFutures)
+
+      // Block here until all the authorizer futures are complete.
       FutureUtils.waitWithLogging(logger.underlying, logIdent,
         "all of the authorizer futures to be completed",
         CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), 
startupDeadline, time)
@@ -511,7 +491,7 @@ class BrokerServer(
       // Wait for all the SocketServer ports to be open, and the Acceptors to 
be started.
       FutureUtils.waitWithLogging(logger.underlying, logIdent,
         "all of the SocketServer Acceptors to be started",
-        socketServerFuture, startupDeadline, time)
+        enableRequestProcessingFuture, startupDeadline, time)
 
       maybeChangeStatus(STARTING, STARTED)
     } catch {
@@ -556,9 +536,6 @@ class BrokerServer(
         }
       }
 
-      if (metadataListener != null)
-        metadataListener.beginShutdown()
-
       lifecycleManager.beginShutdown()
 
       // Stop socket server to stop accepting any more connections and 
requests.
@@ -573,10 +550,6 @@ class BrokerServer(
       if (controlPlaneRequestProcessor != null)
         CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
       CoreUtils.swallow(authorizer.foreach(_.close()), this)
-      if (metadataListener != null) {
-        CoreUtils.swallow(metadataListener.close(), this)
-      }
-      metadataSnapshotter.foreach(snapshotter => 
CoreUtils.swallow(snapshotter.close(), this))
 
       /**
        * We must shutdown the scheduler early because otherwise, the scheduler 
could touch other
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index f03a05a88b0..282787ac49e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -341,7 +341,7 @@ class ControllerServer(
 
       // Install all metadata publishers.
       FutureUtils.waitWithLogging(logger.underlying, logIdent,
-        "all of the metadata publishers to be installed",
+        "the controller metadata publishers to be installed",
         sharedServer.loader.installPublishers(publishers), startupDeadline, 
time)
     } catch {
       case e: Throwable =>
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index ed1953601bd..554207d8c21 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -249,37 +249,35 @@ class SharedServer(
         )
         raftManager.startup()
 
-        if (sharedServerConfig.processRoles.contains(ControllerRole)) {
-          val loaderBuilder = new MetadataLoader.Builder().
-            setNodeId(metaProps.nodeId).
-            setTime(time).
-            setThreadNamePrefix(threadNamePrefix.getOrElse("")).
-            setFaultHandler(metadataLoaderFaultHandler).
-            setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
-          if (brokerMetrics != null) {
-            loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
-          }
-          loader = loaderBuilder.build()
-          snapshotEmitter = new SnapshotEmitter.Builder().
-            setNodeId(metaProps.nodeId).
-            setRaftClient(raftManager.client).
-            build()
-          snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
-            setNodeId(metaProps.nodeId).
-            setTime(time).
-            setFaultHandler(metadataPublishingFaultHandler).
-            
setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes).
-            
setMaxTimeSinceLastSnapshotNs(TimeUnit.MILLISECONDS.toNanos(sharedServerConfig.metadataSnapshotMaxIntervalMs)).
-            setDisabledReason(snapshotsDiabledReason).
-            build()
-          raftManager.register(loader)
-          try {
-            
loader.installPublishers(Collections.singletonList(snapshotGenerator))
-          } catch {
-            case t: Throwable => {
-              error("Unable to install metadata publishers", t)
-              throw new RuntimeException("Unable to install metadata 
publishers.", t)
-            }
+        val loaderBuilder = new MetadataLoader.Builder().
+          setNodeId(metaProps.nodeId).
+          setTime(time).
+          setThreadNamePrefix(threadNamePrefix.getOrElse("")).
+          setFaultHandler(metadataLoaderFaultHandler).
+          setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
+        if (brokerMetrics != null) {
+          loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
+        }
+        loader = loaderBuilder.build()
+        snapshotEmitter = new SnapshotEmitter.Builder().
+          setNodeId(metaProps.nodeId).
+          setRaftClient(raftManager.client).
+          build()
+        snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
+          setNodeId(metaProps.nodeId).
+          setTime(time).
+          setFaultHandler(metadataPublishingFaultHandler).
+          
setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes).
+          
setMaxTimeSinceLastSnapshotNs(TimeUnit.MILLISECONDS.toNanos(sharedServerConfig.metadataSnapshotMaxIntervalMs)).
+          setDisabledReason(snapshotsDiabledReason).
+          build()
+        raftManager.register(loader)
+        try {
+          
loader.installPublishers(Collections.singletonList(snapshotGenerator))
+        } catch {
+          case t: Throwable => {
+            error("Unable to install metadata publishers", t)
+            throw new RuntimeException("Unable to install metadata 
publishers.", t)
           }
         }
         debug("Completed SharedServer startup.")
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
deleted file mode 100644
index a12f25de471..00000000000
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.metadata
-
-import kafka.utils.Logging
-
-import java.util
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CompletableFuture
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
-import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.fault.FaultHandler
-import org.apache.kafka.snapshot.SnapshotReader
-
-import java.util.concurrent.TimeUnit.NANOSECONDS
-import scala.compat.java8.OptionConverters._
-
-
-class BrokerMetadataListener(
-  val brokerId: Int,
-  time: Time,
-  threadNamePrefix: Option[String],
-  val maxBytesBetweenSnapshots: Long,
-  val snapshotter: Option[MetadataSnapshotter],
-  brokerMetrics: BrokerServerMetrics,
-  _metadataLoadingFaultHandler: FaultHandler
-) extends RaftClient.Listener[ApiMessageAndVersion] with Logging {
-
-  private val metadataFaultOccurred = new AtomicBoolean(false)
-  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
-    override def handleFault(failureMessage: String, cause: Throwable): 
RuntimeException = {
-      // If the broker has any kind of error handling metadata records or 
publishing a new image
-      // we will disable taking new snapshots in order to preserve the local 
metadata log. Once we
-      // encounter a metadata processing error, the broker will be in an 
undetermined state.
-      if (metadataFaultOccurred.compareAndSet(false, true)) {
-        error("Disabling metadata snapshots until this broker is restarted.")
-      }
-      _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
-    }
-  }
-
-  private val logContext = new LogContext(s"[BrokerMetadataListener 
id=$brokerId] ")
-  private val log = logContext.logger(classOf[BrokerMetadataListener])
-  logIdent = logContext.logPrefix()
-
-  /**
-   * The highest metadata offset that we've seen.  Written only from the event 
queue thread.
-   */
-  @volatile var _highestOffset = -1L
-
-  /**
-   * The highest metadata epoch that we've seen.  Written only from the event 
queue thread.
-   */
-  private var _highestEpoch = -1
-
-  /**
-   * The highest metadata log time that we've seen. Written only from the 
event queue thread.
-   */
-  private var _highestTimestamp = -1L
-
-  private def provenance(): MetadataProvenance =
-    new MetadataProvenance(_highestOffset, _highestEpoch, _highestTimestamp)
-
-  /**
-   * The current broker metadata image. Accessed only from the event queue 
thread.
-   */
-  private var _image = MetadataImage.EMPTY
-
-  /**
-   * The current metadata delta. Accessed only from the event queue thread.
-   */
-  private var _delta = new MetadataDelta(_image)
-
-  /**
-   * The object to use to publish new metadata changes, or None if this 
listener has not
-   * been activated yet. Accessed only from the event queue thread.
-   */
-  private var _publisher: Option[MetadataPublisher] = None
-
-  /**
-   * The number of bytes of records that we have read  since the last snapshot 
we took.
-   * This does not include records we read from a snapshot.
-   * Accessed only from the event queue thread.
-   */
-  private var _bytesSinceLastSnapshot: Long = 0L
-
-  /**
-   * The event queue which runs this listener.
-   */
-  val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
-
-  /**
-   * Returns the highest metadata-offset. Thread-safe.
-   */
-  def highestMetadataOffset: Long = _highestOffset
-
-  /**
-   * Handle new metadata records.
-   */
-  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
-    eventQueue.append(new HandleCommitsEvent(reader))
-
-  class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
-      extends EventQueue.FailureLoggingEvent(log) {
-    override def run(): Unit = {
-      val results = try {
-        val loadResults = loadBatches(_delta, reader, None, None, None, None)
-        if (isDebugEnabled) {
-          debug(s"Loaded new commits: $loadResults")
-        }
-        loadResults
-      } catch {
-        case e: Throwable =>
-          metadataLoadingFaultHandler.handleFault(s"Unable to load metadata 
commits " +
-            s"from the BatchReader starting at base offset 
${reader.baseOffset()}", e)
-          return
-      } finally {
-        reader.close()
-      }
-
-      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-      
-      val shouldTakeSnapshot: Set[SnapshotReason] = shouldSnapshot()
-      if (shouldTakeSnapshot.nonEmpty) {
-        maybeStartSnapshot(shouldTakeSnapshot)
-      }
-
-      _publisher.foreach(publish)
-    }
-  }
-
-  private def shouldSnapshot(): Set[SnapshotReason] = {
-    val maybeMetadataVersionChanged = metadataVersionChanged.toSet
-
-    if (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) {
-      maybeMetadataVersionChanged + 
SnapshotReason.maxBytesExceeded(_bytesSinceLastSnapshot, 
maxBytesBetweenSnapshots)
-    } else {
-      maybeMetadataVersionChanged
-    }
-  }
-
-  private def metadataVersionChanged: Option[SnapshotReason] = {
-    // The _publisher is empty before starting publishing, and we won't 
compute feature delta
-    // until we starting publishing
-    if (_publisher.nonEmpty) {
-      Option(_delta.featuresDelta()).flatMap { featuresDelta =>
-        featuresDelta
-          .metadataVersionChange()
-          .asScala
-          .map(SnapshotReason.metadataVersionChanged)
-      }
-    } else {
-      None
-    }
-  }
-
-  private def maybeStartSnapshot(reason: Set[SnapshotReason]): Unit = {
-    snapshotter.foreach { snapshotter =>
-      if (metadataFaultOccurred.get()) {
-        trace("Not starting metadata snapshot since we previously had an 
error")
-      } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply(provenance()), reason)) {
-        _bytesSinceLastSnapshot = 0L
-      }
-    }
-  }
-
-  /**
-   * Handle metadata snapshots
-   */
-  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): 
Unit =
-    eventQueue.append(new HandleSnapshotEvent(reader))
-
-  class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
-    extends EventQueue.FailureLoggingEvent(log) {
-    override def run(): Unit = {
-      val snapshotName = 
s"${reader.snapshotId().offset}-${reader.snapshotId().epoch}"
-      try {
-        info(s"Loading snapshot ${snapshotName}")
-        _delta = new MetadataDelta(_image) // Discard any previous deltas.
-        val loadResults = loadBatches(_delta,
-          reader,
-          Some(reader.lastContainedLogTimestamp),
-          Some(reader.lastContainedLogOffset),
-          Some(reader.lastContainedLogEpoch),
-          Some(snapshotName))
-        try {
-          _delta.finishSnapshot()
-        } catch {
-          case e: Throwable => metadataLoadingFaultHandler.handleFault(
-              s"Error finishing snapshot ${snapshotName}", e)
-        }
-        info(s"Loaded snapshot ${snapshotName}: ${loadResults}")
-      } catch {
-        case t: Throwable => metadataLoadingFaultHandler.handleFault("Uncaught 
exception while " +
-          s"loading broker metadata from Metadata snapshot ${snapshotName}", t)
-      } finally {
-        reader.close()
-      }
-      _publisher.foreach(publish)
-    }
-  }
-
-  case class BatchLoadResults(numBatches: Int, numRecords: Int, elapsedUs: 
Long, numBytes: Long) {
-    override def toString: String = {
-      s"$numBatches batch(es) with $numRecords record(s) in $numBytes bytes " +
-        s"ending at offset $highestMetadataOffset in $elapsedUs microseconds"
-    }
-  }
-
-  /**
-   * Load and replay the batches to the metadata delta.
-   *
-   * When loading and replay a snapshot the appendTimestamp and snapshotId 
parameter should be provided.
-   * In a snapshot the append timestamp, offset and epoch reported by the 
batch is independent of the ones
-   * reported by the metadata log.
-   *
-   * @param delta metadata delta on which to replay the records
-   * @param iterator sequence of metadata record bacthes to replay
-   * @param lastAppendTimestamp optional append timestamp to use instead of 
the batches timestamp
-   * @param lastCommittedOffset optional offset to use instead of the batches 
offset
-   * @param lastCommittedEpoch optional epoch to use instead of the batches 
epoch
-   */
-  private def loadBatches(
-    delta: MetadataDelta,
-    iterator: util.Iterator[Batch[ApiMessageAndVersion]],
-    lastAppendTimestamp: Option[Long],
-    lastCommittedOffset: Option[Long],
-    lastCommittedEpoch: Option[Int],
-    snapshotName: Option[String]
-  ): BatchLoadResults = {
-    val startTimeNs = time.nanoseconds()
-    var numBatches = 0
-    var numRecords = 0
-    var numBytes = 0L
-
-    while (iterator.hasNext) {
-      val batch = iterator.next()
-
-      _highestEpoch = lastCommittedEpoch.getOrElse(batch.epoch())
-      _highestTimestamp = 
lastAppendTimestamp.getOrElse(batch.appendTimestamp())
-
-      var index = 0
-      batch.records().forEach { messageAndVersion =>
-        if (isTraceEnabled) {
-          trace(s"Metadata batch ${batch.lastOffset}: processing [${index + 
1}/${batch.records.size}]:" +
-            s" ${messageAndVersion.message}")
-        }
-        _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + 
index)
-        try {
-          delta.replay(messageAndVersion.message())
-        } catch {
-          case e: Throwable => snapshotName match {
-            case None => metadataLoadingFaultHandler.handleFault(
-              s"Error replaying metadata log record at offset 
${_highestOffset}", e)
-            case Some(name) => metadataLoadingFaultHandler.handleFault(
-              s"Error replaying record ${index} from snapshot ${name} at 
offset ${_highestOffset}", e)
-          }
-        } finally {
-          numRecords += 1
-          index += 1
-        }
-      }
-      numBytes = numBytes + batch.sizeInBytes()
-      brokerMetrics.updateBatchSize(batch.records().size())
-      numBatches = numBatches + 1
-    }
-
-    val endTimeNs = time.nanoseconds()
-    val elapsedNs = endTimeNs - startTimeNs
-    brokerMetrics.updateBatchProcessingTime(elapsedNs)
-    BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), 
numBytes)
-  }
-
-  def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = 
{
-    val event = new StartPublishingEvent(publisher)
-    eventQueue.append(event)
-    event.future
-  }
-
-  class StartPublishingEvent(publisher: MetadataPublisher)
-      extends EventQueue.FailureLoggingEvent(log) {
-    val future = new CompletableFuture[Void]()
-
-    override def run(): Unit = {
-      _publisher = Some(publisher)
-      log.info(s"Starting to publish metadata events at offset 
$highestMetadataOffset.")
-      try {
-        // Generate a snapshot if the metadata version changed
-        metadataVersionChanged.foreach(reason => 
maybeStartSnapshot(Set(reason)))
-        publish(publisher)
-        future.complete(null)
-      } catch {
-        case e: Throwable =>
-          future.completeExceptionally(e)
-          throw e
-      }
-    }
-  }
-
-  // This is used in tests to alter the publisher that is in use by the broker.
-  def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
-    val event = new AlterPublisherEvent(publisher)
-    eventQueue.append(event)
-    event.future
-  }
-
-  class AlterPublisherEvent(publisher: MetadataPublisher)
-    extends EventQueue.FailureLoggingEvent(log) {
-    val future = new CompletableFuture[Void]()
-
-    override def run(): Unit = {
-      _publisher = Some(publisher)
-      log.info(s"Set publisher to ${publisher}")
-      future.complete(null)
-    }
-  }
-
-  private def publish(publisher: MetadataPublisher): Unit = {
-    val delta = _delta
-    try {
-      _image = _delta.apply(provenance())
-    } catch {
-      case t: Throwable =>
-        // If we cannot apply the delta, this publish event will throw and we 
will not publish a new image.
-        // The broker will continue applying metadata records and attempt to 
publish again.
-        throw metadataLoadingFaultHandler.handleFault(s"Error applying 
metadata delta $delta", t)
-    }
-
-    _delta = new MetadataDelta(_image)
-    if (isDebugEnabled) {
-      debug(s"Publishing new metadata delta $delta at offset 
${_image.highestOffsetAndEpoch().offset}.")
-    }
-
-    // This publish call is done with its own try-catch and fault handler
-    publisher.publish(delta, _image)
-
-    // Update the metrics since the publisher handled the lastest image
-    brokerMetrics.updateLastAppliedImageProvenance(_image.provenance())
-  }
-
-  override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
-    // Nothing to do.
-  }
-
-  override def beginShutdown(): Unit = {
-    eventQueue.beginShutdown("beginShutdown")
-  }
-
-  def close(): Unit = {
-    beginShutdown()
-    eventQueue.close()
-  }
-
-  // VisibleForTesting
-  private[kafka] def getImageRecords(): 
CompletableFuture[util.List[ApiMessageAndVersion]] = {
-    val future = new CompletableFuture[util.List[ApiMessageAndVersion]]()
-    eventQueue.append(new GetImageRecordsEvent(future))
-    future
-  }
-
-  class GetImageRecordsEvent(future: 
CompletableFuture[util.List[ApiMessageAndVersion]])
-      extends EventQueue.FailureLoggingEvent(log) {
-    override def run(): Unit = {
-      val writer = new RecordListWriter()
-      val options = new ImageWriterOptions.Builder().
-        setMetadataVersion(_image.features().metadataVersion()).
-        build()
-      try {
-        _image.write(writer, options)
-      } finally {
-        writer.close()
-      }
-      future.complete(writer.records())
-    }
-  }
-}
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 4ad9f15beb8..e1bf2e89607 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -18,19 +18,22 @@
 package kafka.server.metadata
 
 import java.util.{OptionalInt, Properties}
-import java.util.concurrent.atomic.AtomicLong
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, 
TopicsImage}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.fault.FaultHandler
 
+import java.util.concurrent.CompletableFuture
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
@@ -123,11 +126,17 @@ class BrokerMetadataPublisher(
   var _firstPublish = true
 
   /**
-   * This is updated after all components (e.g. LogManager) has finished 
publishing the new metadata delta
+   * A future that is completed when we first publish.
    */
-  val publishedOffsetAtomic = new AtomicLong(-1)
+  val firstPublishFuture = new CompletableFuture[Void]
 
-  override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
+  override def name(): String = "BrokerMetadataPublisher"
+
+  override def onMetadataUpdate(
+    delta: MetadataDelta,
+    newImage: MetadataImage,
+    manifest: LoaderManifest
+  ): Unit = {
     val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
 
     val deltaName = if (_firstPublish) {
@@ -269,12 +278,12 @@ class BrokerMetadataPublisher(
       if (_firstPublish) {
         finishInitializingReplicaManager(newImage)
       }
-      publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
     } catch {
       case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
         s"publishing broker metadata from $deltaName", t)
     } finally {
       _firstPublish = false
+      firstPublishFuture.complete(null)
     }
   }
 
@@ -285,8 +294,6 @@ class BrokerMetadataPublisher(
     }
   }
 
-  override def publishedOffset: Long = publishedOffsetAtomic.get()
-
   def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
     config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
   }
@@ -392,5 +399,9 @@ class BrokerMetadataPublisher(
       case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
starting high " +
         "watermark checkpoint thread during startup", t)
     }
-}
+  }
+
+  override def close(): Unit = {
+    firstPublishFuture.completeExceptionally(new TimeoutException())
+  }
 }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
deleted file mode 100644
index b8828722559..00000000000
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.metadata
-
-import java.util.concurrent.RejectedExecutionException
-import kafka.utils.Logging
-import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.image.MetadataImage
-import org.apache.kafka.image.writer.{ImageWriterOptions, RaftSnapshotWriter}
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.snapshot.SnapshotWriter
-import scala.jdk.CollectionConverters._
-
-trait SnapshotWriterBuilder {
-  def build(committedOffset: Long,
-            committedEpoch: Int,
-            lastContainedLogTime: Long): 
Option[SnapshotWriter[ApiMessageAndVersion]]
-}
-
-class BrokerMetadataSnapshotter(
-  brokerId: Int,
-  val time: Time,
-  threadNamePrefix: Option[String],
-  writerBuilder: SnapshotWriterBuilder
-) extends Logging with MetadataSnapshotter {
-  /**
-   * The maximum number of records we will put in each batch.
-   *
-   * From the perspective of the Raft layer, the limit on batch size is 
specified in terms of
-   * bytes, not number of records. @See {@link 
KafkaRaftClient#MAX_BATCH_SIZE_BYTES} for details.
-   * However, it's more convenient to limit the batch size here in terms of 
number of records.
-   * So we chose a low number that will not cause problems.
-   */
-  private val maxRecordsInBatch = 1024
-
-  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter 
id=$brokerId] ")
-  logIdent = logContext.logPrefix()
-
-  /**
-   * The offset of the snapshot in progress, or -1 if there isn't one. 
Accessed only under
-   * the object lock.
-   */
-  private var _currentSnapshotOffset = -1L
-
-  /**
-   * The event queue which runs this listener.
-   */
-  val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""), new ShutdownEvent())
-
-  override def maybeStartSnapshot(
-    lastContainedLogTime: Long,
-    image: MetadataImage,
-    snapshotReasons: Set[SnapshotReason]
-  ): Boolean = synchronized {
-    if (_currentSnapshotOffset != -1) {
-      info(s"Declining to create a new snapshot at 
${image.highestOffsetAndEpoch} because " +
-        s"there is already a snapshot in progress at offset 
${_currentSnapshotOffset}")
-      false
-    } else {
-      val writer = writerBuilder.build(
-        image.highestOffsetAndEpoch().offset,
-        image.highestOffsetAndEpoch().epoch,
-        lastContainedLogTime
-      )
-      if (writer.nonEmpty) {
-        _currentSnapshotOffset = image.highestOffsetAndEpoch.offset
-
-        val snapshotReasonsMessage = 
SnapshotReason.stringFromReasons(snapshotReasons.asJava)
-        info(s"Creating a new snapshot at ${image.highestOffsetAndEpoch} 
because: $snapshotReasonsMessage")
-        eventQueue.append(new CreateSnapshotEvent(image, writer.get))
-        true
-      } else {
-        info(s"Declining to create a new snapshot at 
${image.highestOffsetAndEpoch()} because " +
-          s"there is already a snapshot at offset 
${image.highestOffsetAndEpoch().offset}")
-        false
-      }
-    }
-  }
-
-  class CreateSnapshotEvent(
-    image: MetadataImage,
-    snapshotWriter: SnapshotWriter[ApiMessageAndVersion]
-  ) extends EventQueue.Event {
-
-    override def run(): Unit = {
-      val writer = new RaftSnapshotWriter(snapshotWriter, maxRecordsInBatch)
-      val options = new ImageWriterOptions.Builder().
-        setMetadataVersion(image.features().metadataVersion()).
-        build()
-      try {
-        image.write(writer, options)
-      } finally {
-        try {
-          writer.close()
-        } finally {
-          BrokerMetadataSnapshotter.this.synchronized {
-            _currentSnapshotOffset = -1L
-          }
-        }
-      }
-    }
-
-    override def handleException(e: Throwable): Unit = {
-      e match {
-        case _: RejectedExecutionException =>
-          info("Not processing CreateSnapshotEvent because the event queue is 
closed.")
-        case _ => error("Unexpected error handling CreateSnapshotEvent", e)
-      }
-    }
-  }
-
-  def beginShutdown(): Unit = {
-    eventQueue.beginShutdown("beginShutdown");
-  }
-
-  class ShutdownEvent() extends EventQueue.Event {
-    override def run(): Unit = {
-    }
-  }
-
-  def close(): Unit = {
-    beginShutdown()
-    eventQueue.close()
-  }
-}
diff --git a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
deleted file mode 100644
index b63a2c056c0..00000000000
--- a/core/src/main/scala/kafka/server/metadata/MetadataPublisher.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server.metadata
-
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-
-/**
- * An object which publishes a new metadata image.
- */
-trait MetadataPublisher {
-  /**
-   * Publish a new metadata image.
-   *
-   * @param delta                  The delta between the old image and the new 
one.
-   * @param newImage               The new image, which is the result of 
applying the
-   *                               delta to the previous image.
-   */
-  def publish(delta: MetadataDelta, newImage: MetadataImage): Unit
-
-  /**
-   * The highest offset of metadata topic which has been published
-   */
-  def publishedOffset: Long
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
deleted file mode 100644
index e559a6b753d..00000000000
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ /dev/null
@@ -1,413 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util
-import java.util.concurrent.atomic.AtomicReference
-import java.util.{Collections, Optional}
-import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{Endpoint, Uuid}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, 
VersionRange}
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import scala.jdk.CollectionConverters._
-
-class BrokerMetadataListenerTest {
-  private val metadataLoadingFaultHandler = new MockFaultHandler("metadata 
loading")
-
-  @AfterEach
-  def verifyNoFaults(): Unit = {
-    metadataLoadingFaultHandler.maybeRethrowFirstException()
-  }
-
-  private def newBrokerMetadataListener(
-    metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
-    snapshotter: Option[MetadataSnapshotter] = None,
-    maxBytesBetweenSnapshots: Long = 1000000L,
-    faultHandler: FaultHandler = metadataLoadingFaultHandler
-  ): BrokerMetadataListener = {
-    new BrokerMetadataListener(
-      brokerId = 0,
-      time = Time.SYSTEM,
-      threadNamePrefix = None,
-      maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
-      snapshotter = snapshotter,
-      brokerMetrics = metrics,
-      _metadataLoadingFaultHandler = faultHandler
-    )
-  }
-
-  @Test
-  def testCreateAndClose(): Unit = {
-    val listener = newBrokerMetadataListener()
-    listener.close()
-  }
-
-  @Test
-  def testPublish(): Unit = {
-    val metrics = BrokerServerMetrics(new Metrics())
-    val listener = newBrokerMetadataListener(metrics = metrics)
-    try {
-      val unfencedTimestamp = 300L
-      listener.handleCommit(
-        RecordTestUtils.mockBatchReader(
-          100,
-          unfencedTimestamp,
-          util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
-            setBrokerId(0).
-            setBrokerEpoch(100L).
-            setFenced(false).
-            setRack(null).
-            setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 
0.toShort))
-        )
-      )
-      val imageRecords = listener.getImageRecords().get()
-      assertEquals(0, imageRecords.size())
-      assertEquals(100L, listener.highestMetadataOffset)
-      assertEquals(-1L, metrics.lastAppliedOffset())
-      assertEquals(-1L, metrics.lastAppliedTimestamp())
-      assertEquals(0L, metrics.metadataLoadErrorCount.get)
-      assertEquals(0L, metrics.metadataApplyErrorCount.get)
-
-      val fencedTimestamp = 500L
-      val fencedLastOffset = 200L
-      listener.handleCommit(
-        RecordTestUtils.mockBatchReader(
-          fencedLastOffset,
-          fencedTimestamp,
-          util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
-            setBrokerId(1).
-            setBrokerEpoch(200L).
-            setFenced(true).
-            setRack(null).
-            setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 
0.toShort))
-        )
-      )
-      listener.startPublishing(new MetadataPublisher {
-        override def publish(delta: MetadataDelta, newImage: MetadataImage): 
Unit = {
-          assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
-          assertEquals(new BrokerRegistration(0, 100L,
-            Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg"), 
Collections.emptyList[Endpoint](),
-            Collections.emptyMap[String, VersionRange](), 
Optional.empty[String](), false, false),
-            delta.clusterDelta().broker(0))
-          assertEquals(new BrokerRegistration(1, 200L,
-            Uuid.fromString("QkOQtNKVTYatADcaJ28xDg"), 
Collections.emptyList[Endpoint](),
-            Collections.emptyMap[String, VersionRange](), 
Optional.empty[String](), true, false),
-            delta.clusterDelta().broker(1))
-        }
-
-        override def publishedOffset: Long = -1
-      }).get()
-
-      assertEquals(fencedLastOffset, metrics.lastAppliedOffset())
-      assertEquals(fencedTimestamp, metrics.lastAppliedTimestamp())
-      assertEquals(0L, metrics.metadataLoadErrorCount.get)
-      assertEquals(0L, metrics.metadataApplyErrorCount.get)
-    } finally {
-      listener.close()
-    }
-  }
-
-  class MockMetadataSnapshotter extends MetadataSnapshotter {
-    var image = MetadataImage.EMPTY
-    val failure = new AtomicReference[Throwable](null)
-    var activeSnapshotOffset = -1L
-    var prevCommittedOffset = -1L
-    var prevCommittedEpoch = -1
-    var prevLastContainedLogTime = -1L
-
-    override def maybeStartSnapshot(lastContainedLogTime: Long, newImage: 
MetadataImage, reason: Set[SnapshotReason]): Boolean = {
-      try {
-        if (activeSnapshotOffset == -1L) {
-          assertTrue(prevCommittedOffset <= 
newImage.highestOffsetAndEpoch().offset)
-          assertTrue(prevCommittedEpoch <= 
newImage.highestOffsetAndEpoch().epoch)
-          assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
-          prevCommittedOffset = newImage.highestOffsetAndEpoch().offset
-          prevCommittedEpoch = newImage.highestOffsetAndEpoch().epoch
-          prevLastContainedLogTime = lastContainedLogTime
-          image = newImage
-          activeSnapshotOffset = newImage.highestOffsetAndEpoch().offset
-          true
-        } else {
-          false
-        }
-      } catch {
-        case t: Throwable => failure.compareAndSet(null, t)
-      }
-    }
-  }
-
-  class MockMetadataPublisher extends MetadataPublisher {
-    var image = MetadataImage.EMPTY
-
-    override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit 
= {
-      image = newImage
-    }
-
-    override def publishedOffset: Long = -1
-  }
-
-  private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
-  private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
-
-  private def generateManyRecords(listener: BrokerMetadataListener,
-                                  endOffset: Long): Unit = {
-    (0 to 10000).foreach { _ =>
-      listener.handleCommit(
-        RecordTestUtils.mockBatchReader(
-          endOffset,
-          0,
-          util.Arrays.asList(
-            new ApiMessageAndVersion(new PartitionChangeRecord().
-              setPartitionId(0).
-              setTopicId(FOO_ID).
-              setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
-            new ApiMessageAndVersion(new PartitionChangeRecord().
-              setPartitionId(0).
-              setTopicId(FOO_ID).
-              setRemovingReplicas(Collections.emptyList()), 0.toShort)
-          )
-        )
-      )
-    }
-    listener.getImageRecords().get()
-  }
-
-  private def generateBadRecords(listener: BrokerMetadataListener,
-                                endOffset: Long): Unit = {
-    listener.handleCommit(
-      RecordTestUtils.mockBatchReader(
-        endOffset,
-        0,
-        util.Arrays.asList(
-          new ApiMessageAndVersion(new PartitionChangeRecord().
-            setPartitionId(0).
-            setTopicId(BAR_ID).
-            setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
-          new ApiMessageAndVersion(new PartitionChangeRecord().
-            setPartitionId(0).
-            setTopicId(BAR_ID).
-            setRemovingReplicas(Collections.emptyList()), 0.toShort)
-        )
-      )
-    )
-    listener.getImageRecords().get()
-  }
-
-  @Test
-  def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
-    val listener = newBrokerMetadataListener(maxBytesBetweenSnapshots = 1000L)
-    try {
-      val brokerIds = 0 to 3
-
-      registerBrokers(listener, brokerIds, endOffset = 100L)
-      createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 
200L)
-      listener.getImageRecords().get()
-      assertEquals(200L, listener.highestMetadataOffset)
-
-      generateManyRecords(listener, endOffset = 1000L)
-      assertEquals(1000L, listener.highestMetadataOffset)
-    } finally {
-      listener.close()
-    }
-  }
-
-  @Test
-  def testCreateSnapshot(): Unit = {
-    val snapshotter = new MockMetadataSnapshotter()
-    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
-      maxBytesBetweenSnapshots = 1000L)
-    try {
-      val brokerIds = 0 to 3
-
-      registerBrokers(listener, brokerIds, endOffset = 100L)
-      createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 
200L)
-      listener.getImageRecords().get()
-      assertEquals(200L, listener.highestMetadataOffset)
-
-      // Check that we generate at least one snapshot once we see enough 
records.
-      assertEquals(-1L, snapshotter.prevCommittedOffset)
-      generateManyRecords(listener, 1000L)
-      assertEquals(1000L, snapshotter.prevCommittedOffset)
-      assertEquals(1000L, snapshotter.activeSnapshotOffset)
-      snapshotter.activeSnapshotOffset = -1L
-
-      // Test creating a new snapshot after publishing it.
-      val publisher = new MockMetadataPublisher()
-      listener.startPublishing(publisher).get()
-      generateManyRecords(listener, 2000L)
-      listener.getImageRecords().get()
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      assertEquals(2000L, snapshotter.prevCommittedOffset)
-
-      // Test how we handle the snapshotter returning false.
-      generateManyRecords(listener, 3000L)
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      generateManyRecords(listener, 4000L)
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      snapshotter.activeSnapshotOffset = -1L
-      generateManyRecords(listener, 5000L)
-      assertEquals(5000L, snapshotter.activeSnapshotOffset)
-      assertEquals(null, snapshotter.failure.get())
-    } finally {
-      listener.close()
-    }
-  }
-
-  @Test
-  def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = {
-    val snapshotter = new MockMetadataSnapshotter()
-    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
-      maxBytesBetweenSnapshots = 1000L)
-
-    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
-    listener.getImageRecords().get()
-    assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate 
snapshot on metadata version change before starting publishing")
-  }
-
-  @Test
-  def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = {
-    val snapshotter = new MockMetadataSnapshotter()
-    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
-      maxBytesBetweenSnapshots = 1000L)
-
-    val endOffset = 100L
-    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), endOffset)
-    listener.startPublishing(new MockMetadataPublisher()).get()
-    assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should try 
to generate snapshot when starting publishing")
-  }
-
-  @Test
-  def testSnapshotAfterMetadataVersionChange(): Unit = {
-    val snapshotter = new MockMetadataSnapshotter()
-    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
-      maxBytesBetweenSnapshots = 1000L)
-    listener.startPublishing(new MockMetadataPublisher()).get()
-
-    val endOffset = 100L
-    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
(MetadataVersion.latest().featureLevel() - 1).toShort, endOffset)
-    // Waiting for the metadata version update to get processed
-    listener.getImageRecords().get()
-    assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should 
generate snapshot on feature update")
-  }
-
-  @Test
-  def testNoSnapshotAfterError(): Unit = {
-    val snapshotter = new MockMetadataSnapshotter()
-    val faultHandler = new MockFaultHandler("metadata loading")
-
-    val listener = newBrokerMetadataListener(
-      snapshotter = Some(snapshotter),
-      maxBytesBetweenSnapshots = 1000L,
-      faultHandler = faultHandler)
-    try {
-      val brokerIds = 0 to 3
-
-      registerBrokers(listener, brokerIds, endOffset = 100L)
-      createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 
200L)
-      listener.getImageRecords().get()
-      assertEquals(200L, listener.highestMetadataOffset)
-      assertEquals(-1L, snapshotter.prevCommittedOffset)
-      assertEquals(-1L, snapshotter.activeSnapshotOffset)
-
-      // Append invalid records that will normally trigger a snapshot
-      generateBadRecords(listener, 1000L)
-      assertEquals(-1L, snapshotter.prevCommittedOffset)
-      assertEquals(-1L, snapshotter.activeSnapshotOffset)
-
-      // Generate some records that will not throw an error, verify still no 
snapshots
-      generateManyRecords(listener, 2000L)
-      assertEquals(-1L, snapshotter.prevCommittedOffset)
-      assertEquals(-1L, snapshotter.activeSnapshotOffset)
-    } finally {
-      listener.close()
-    }
-  }
-
-  private def registerBrokers(
-    listener: BrokerMetadataListener,
-    brokerIds: Iterable[Int],
-    endOffset: Long
-  ): Unit = {
-    brokerIds.foreach { brokerId =>
-      listener.handleCommit(
-        RecordTestUtils.mockBatchReader(
-          endOffset,
-          0,
-          util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
-            setBrokerId(brokerId).
-            setBrokerEpoch(100L).
-            setFenced(false).
-            setRack(null).
-            setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + 
brokerId)), 0.toShort))
-        )
-      )
-    }
-  }
-
-  private def createTopicWithOnePartition(
-    listener: BrokerMetadataListener,
-    replicas: Seq[Int],
-    endOffset: Long
-  ): Unit = {
-    listener.handleCommit(
-      RecordTestUtils.mockBatchReader(
-        endOffset,
-        0,
-        util.Arrays.asList(
-          new ApiMessageAndVersion(new TopicRecord().
-            setName("foo").
-            setTopicId(FOO_ID), 0.toShort),
-          new ApiMessageAndVersion(new PartitionRecord().
-            setPartitionId(0).
-            setTopicId(FOO_ID).
-            setIsr(replicas.map(Int.box).asJava).
-            setLeader(0).
-            setReplicas(replicas.map(Int.box).asJava), 0.toShort)
-        )
-      )
-    )
-  }
-
-  private def updateFeature(
-    listener: BrokerMetadataListener,
-    feature: String,
-    version: Short,
-    endOffset: Long
-  ): Unit = {
-    listener.handleCommit(
-      RecordTestUtils.mockBatchReader(
-        endOffset,
-        0,
-        util.Arrays.asList(
-          new ApiMessageAndVersion(new FeatureLevelRecord().
-            setName(feature).
-            setFeatureLevel(version), 0.toShort)
-        )
-      )
-    )
-  }
-
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index d4cdda431ed..40513bba270 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -33,9 +33,11 @@ import 
org.apache.kafka.common.config.ConfigResource.Type.BROKER
 import org.apache.kafka.common.utils.Exit
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.GroupCoordinator
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, TopicImage, TopicsImage}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, MetadataProvenance, TopicImage, TopicsImage}
+import org.apache.kafka.image.loader.LogDeltaManifest
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
+import org.apache.kafka.raft.LeaderAndEpoch
 import org.apache.kafka.server.fault.FaultHandler
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -45,6 +47,7 @@ import org.mockito.Mockito.{doThrow, mock, verify}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
+import java.util.concurrent.TimeUnit
 import scala.jdk.CollectionConverters._
 
 class BrokerMetadataPublisherTest {
@@ -208,7 +211,7 @@ class BrokerMetadataPublisherTest {
         thenAnswer(new Answer[Unit]() {
           override def answer(invocation: InvocationOnMock): Unit = 
numTimesReloadCalled.addAndGet(1)
         })
-      broker.metadataPublisher.dynamicConfigPublisher = publisher
+      broker.brokerMetadataPublisher.dynamicConfigPublisher = publisher
       val admin = Admin.create(cluster.clientProperties())
       try {
         assertEquals(0, numTimesReloadCalled.get())
@@ -246,11 +249,12 @@ class BrokerMetadataPublisherTest {
       cluster.waitForReadyBrokers()
       val broker = cluster.brokers().values().iterator().next()
       TestUtils.retry(60000) {
-        assertNotNull(broker.metadataPublisher)
+        assertNotNull(broker.brokerMetadataPublisher)
       }
-      val publisher = Mockito.spy(broker.metadataPublisher)
+      val publisher = Mockito.spy(broker.brokerMetadataPublisher)
       doThrow(new RuntimeException("injected 
failure")).when(publisher).updateCoordinator(any(), any(), any(), any(), any())
-      broker.metadataListener.alterPublisher(publisher).get()
+      
broker.sharedServer.loader.removeAndClosePublisher(broker.brokerMetadataPublisher).get(1,
 TimeUnit.MINUTES)
+      
broker.sharedServer.loader.installPublishers(List(publisher).asJava).get(1, 
TimeUnit.MINUTES)
       val admin = Admin.create(cluster.clientProperties())
       try {
         admin.createTopics(singletonList(new NewTopic("foo", 1, 
1.toShort))).all().get()
@@ -296,7 +300,8 @@ class BrokerMetadataPublisherTest {
       .setImage(image)
       .build()
 
-    metadataPublisher.publish(delta, image)
+    metadataPublisher.onMetadataUpdate(delta, image,
+      new LogDeltaManifest(MetadataProvenance.EMPTY, LeaderAndEpoch.UNKNOWN, 
1, 100, 42));
 
     verify(groupCoordinator).onNewMetadataImage(image, delta)
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
deleted file mode 100644
index c086c3dd8b2..00000000000
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.nio.ByteBuffer
-import java.util.Optional
-import java.util.concurrent.{CompletableFuture, CountDownLatch}
-import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.protocol.ByteBufferAccessor
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, MetadataProvenance}
-import org.apache.kafka.metadata.MetadataRecordSerde
-import org.apache.kafka.metadata.util.SnapshotReason
-import org.apache.kafka.queue.EventQueue
-import org.apache.kafka.raft.OffsetAndEpoch
-import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.snapshot.{MockRawSnapshotWriter, 
RecordsSnapshotWriter, SnapshotWriter}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
-import org.junit.jupiter.api.Timeout
-import org.junit.jupiter.api.Test
-
-import java.util
-import scala.compat.java8.OptionConverters._
-
-class BrokerMetadataSnapshotterTest {
-  @Test
-  def testCreateAndClose(): Unit = {
-    val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
-      (_, _, _) => throw new RuntimeException("unimplemented"))
-    snapshotter.close()
-  }
-
-  class MockSnapshotWriterBuilder extends SnapshotWriterBuilder {
-    var image = new CompletableFuture[MetadataImage]
-
-    override def build(committedOffset: Long,
-                       committedEpoch: Int,
-                       lastContainedLogTime: Long): 
Option[SnapshotWriter[ApiMessageAndVersion]] = {
-      val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
-      RecordsSnapshotWriter.createWithHeader(
-        () => {
-          Optional.of(
-            new MockRawSnapshotWriter(offsetAndEpoch, 
consumeSnapshotBuffer(committedOffset, committedEpoch, lastContainedLogTime))
-          )
-        },
-        4096,
-        MemoryPool.NONE,
-        Time.SYSTEM,
-        lastContainedLogTime,
-        CompressionType.NONE,
-        MetadataRecordSerde.INSTANCE
-      ).asScala
-    }
-
-    def consumeSnapshotBuffer(
-      committedOffset: Long,
-      committedEpoch: Int,
-      lastContainedLogTime: Long
-    )(buffer: ByteBuffer): Unit = {
-      val delta = new MetadataDelta(MetadataImage.EMPTY)
-      val memoryRecords = MemoryRecords.readableRecords(buffer)
-      val batchIterator = memoryRecords.batchIterator()
-      while (batchIterator.hasNext) {
-        val batch = batchIterator.next()
-        if (!batch.isControlBatch()) {
-          batch.forEach(record => {
-            val recordBuffer = record.value().duplicate()
-            val messageAndVersion = MetadataRecordSerde.INSTANCE.read(
-              new ByteBufferAccessor(recordBuffer), recordBuffer.remaining())
-            delta.replay(messageAndVersion.message())
-          })
-        }
-      }
-      image.complete(delta.apply(new MetadataProvenance(committedOffset, 
committedEpoch, lastContainedLogTime)))
-    }
-  }
-
-  class BlockingEvent extends EventQueue.Event {
-    val latch = new CountDownLatch(1)
-    override def run(): Unit = latch.await()
-  }
-
-  @Test
-  @Timeout(30)
-  def testCreateSnapshot(): Unit = {
-    val writerBuilder = new MockSnapshotWriterBuilder()
-    val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, 
writerBuilder)
-
-    try {
-      val blockingEvent = new BlockingEvent()
-      val reasons = Set(SnapshotReason.UNKNOWN)
-
-      snapshotter.eventQueue.append(blockingEvent)
-      assertTrue(snapshotter.maybeStartSnapshot(2000L, 
MetadataImageTest.IMAGE1, reasons))
-      assertFalse(snapshotter.maybeStartSnapshot(4000L, 
MetadataImageTest.IMAGE2, reasons))
-      blockingEvent.latch.countDown()
-      assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
-    } finally {
-      snapshotter.close()
-    }
-  }
-
-  class MockSnapshotWriter extends SnapshotWriter[ApiMessageAndVersion] {
-    val batches = new util.ArrayList[util.List[ApiMessageAndVersion]]
-    override def snapshotId(): OffsetAndEpoch = new OffsetAndEpoch(0, 0)
-    override def lastContainedLogOffset(): Long = 0
-    override def lastContainedLogEpoch(): Int = 0
-    override def isFrozen: Boolean = false
-    override def append(batch: util.List[ApiMessageAndVersion]): Unit = 
batches.add(batch)
-    override def freeze(): Unit = {}
-    override def close(): Unit = {}
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index fd463420d17..e0b65b90cff 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1244,7 +1244,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(
       () => {
         brokers.forall { broker =>
-          val metadataOffset = 
broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
+          val metadataOffset = 
broker.asInstanceOf[BrokerServer].sharedServer.loader.lastAppliedOffset()
           metadataOffset >= controllerOffset
         }
       }, msg)

Reply via email to