This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new f3cf6db3a53 KAFKA-14114: Add Metadata Error Related Metrics
f3cf6db3a53 is described below

commit f3cf6db3a53a032fb0480258b29ecf416e8385d3
Author: Niket Goel <[email protected]>
AuthorDate: Wed Jul 27 18:52:33 2022 -0700

    KAFKA-14114: Add Metadata Error Related Metrics
    
    This PR adds in 3 metrics as described in KIP-859:
     kafka.server:type=broker-metadata-metrics,name=metadata-load-error-count
     kafka.server:type=broker-metadata-metrics,name=metadata-apply-error-count
     kafka.controller:type=KafkaController,name=MetadataErrorCount
    
    These metrics are incremented by fault handlers when the appropriate fault 
happens. Broker-side
    load errors happen in BrokerMetadataListener. Broker-side apply errors 
happen in the
    BrokerMetadataPublisher. The metric on the controller is incremented when 
the standby controller
    (not active) encounters a metadata error.
    
    In BrokerMetadataPublisher, try to limit the damage caused by an exception 
by introducing more
    catch blocks. The only fatal failures here are those that happen during 
initialization, when we
    initialize the manager objects (these would also be fatal in ZK mode).
    
    In BrokerMetadataListener, try to improve the logging of faults, especially 
ones that happen when
    replaying a snapshot. Try to limit the damage caused by an exception.
    
    Replace MetadataFaultHandler with LoggingFaultHandler, which is more 
flexible and takes a Runnable
    argument. Add LoggingFaultHandlerTest.
    
    Make QuorumControllerMetricsTest stricter. Fix a bug where we weren't 
cleaning up some metrics from
    the yammer registry on close in QuorumControllerMetrics.
    
    Co-author: Colin P. McCabe <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  15 +-
 .../main/scala/kafka/server/ControllerServer.scala |   5 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  27 +-
 .../server/metadata/BrokerMetadataListener.scala   |  56 ++--
 .../server/metadata/BrokerMetadataPublisher.scala  | 300 +++++++++++++--------
 .../server/metadata/BrokerServerMetrics.scala      |  26 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  17 +-
 .../kafka/server/QuorumTestHarness.scala           |  34 ++-
 .../server/metadata/BrokerServerMetricsTest.scala  |  38 ++-
 .../scala/unit/kafka/metrics/MetricsTest.scala     |  25 +-
 .../metadata/BrokerMetadataListenerTest.scala      |  17 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |  77 ++++--
 .../apache/kafka/controller/ControllerMetrics.java |   4 +
 .../apache/kafka/controller/QuorumController.java  |   8 +-
 .../kafka/controller/QuorumControllerMetrics.java  |  24 ++
 .../metadata/fault/MetadataFaultException.java     |  32 ---
 .../kafka/metadata/fault/MetadataFaultHandler.java |  36 ---
 .../kafka/controller/MockControllerMetrics.java    |  13 +
 .../controller/QuorumControllerMetricsTest.java    |  30 +++
 .../apache/kafka/server/fault/FaultHandler.java    |  25 +-
 .../kafka/server/fault/FaultHandlerException.java} |   8 +-
 .../kafka/server/fault/LoggingFaultHandler.java    |  54 ++++
 .../server/fault/ProcessExitingFaultHandler.java   |   9 +-
 .../server/fault/LoggingFaultHandlerTest.java      |  57 ++++
 .../kafka/server/fault/MockFaultHandler.java       |  20 +-
 25 files changed, 669 insertions(+), 288 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0bdd6734975..1008decadb1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -50,6 +50,7 @@ import org.apache.kafka.raft.RaftConfig.AddressSpec
 import org.apache.kafka.raft.{RaftClient, RaftConfig}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.snapshot.SnapshotWriter
 
@@ -76,9 +77,13 @@ class BrokerServer(
   val raftManager: RaftManager[ApiMessageAndVersion],
   val time: Time,
   val metrics: Metrics,
+  val brokerMetrics: BrokerServerMetrics,
   val threadNamePrefix: Option[String],
   val initialOfflineDirs: Seq[String],
-  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]]
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]],
+  val fatalFaultHandler: FaultHandler,
+  val metadataLoadingFaultHandler: FaultHandler,
+  val metadataPublishingFaultHandler: FaultHandler
 ) extends KafkaBroker {
 
   override def brokerState: BrokerState = Option(lifecycleManager).
@@ -315,8 +320,8 @@ class BrokerServer(
         threadNamePrefix,
         config.metadataSnapshotMaxNewRecordBytes,
         metadataSnapshotter,
-        BrokerServerMetrics(metrics)
-      )
+        brokerMetrics,
+        metadataLoadingFaultHandler)
 
       val networkListeners = new ListenerCollection()
       config.effectiveAdvertisedListeners.foreach { ep =>
@@ -432,7 +437,9 @@ class BrokerServer(
         transactionCoordinator,
         clientQuotaMetadataManager,
         dynamicConfigHandlers.toMap,
-        authorizer)
+        authorizer,
+        fatalFaultHandler,
+        metadataPublishingFaultHandler)
 
       // Tell the metadata listener to start publishing its output, and wait 
for the first
       // publish operation to complete. This first operation will initialize 
logManager,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 212c092e1ab..19a6e307d62 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -37,7 +37,7 @@ import 
org.apache.kafka.common.security.scram.internals.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
-import org.apache.kafka.controller.{BootstrapMetadata, Controller, 
QuorumController, QuorumControllerMetrics, QuorumFeatures}
+import org.apache.kafka.controller.{BootstrapMetadata, Controller, 
ControllerMetrics, QuorumController, QuorumFeatures}
 import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -61,6 +61,7 @@ class ControllerServer(
   val raftManager: RaftManager[ApiMessageAndVersion],
   val time: Time,
   val metrics: Metrics,
+  val controllerMetrics: ControllerMetrics,
   val threadNamePrefix: Option[String],
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]],
   val configSchema: KafkaConfigSchema,
@@ -201,7 +202,7 @@ class ControllerServer(
           
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
           setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
           setMaxIdleIntervalNs(maxIdleIntervalNs).
-          setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)).
+          setMetrics(controllerMetrics).
           setCreateTopicPolicy(createTopicPolicy.asJava).
           setAlterConfigPolicy(alterConfigPolicy.asJava).
           setConfigurationValidator(new ControllerConfigurationValidator()).
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index e7cf8f8f1fa..2338ef5e7c4 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -23,17 +23,17 @@ import kafka.log.{LogConfig, UnifiedLog}
 import kafka.metrics.KafkaMetricsReporter
 import kafka.raft.KafkaRaftManager
 import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
+import kafka.server.metadata.BrokerServerMetrics
 import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{KafkaException, Uuid}
-import org.apache.kafka.controller.BootstrapMetadata
-import org.apache.kafka.metadata.fault.MetadataFaultHandler
+import org.apache.kafka.controller.{BootstrapMetadata, QuorumControllerMetrics}
 import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.ProcessExitingFaultHandler
+import org.apache.kafka.server.fault.{LoggingFaultHandler, 
ProcessExitingFaultHandler}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.nio.file.Paths
@@ -83,34 +83,49 @@ class KafkaRaftServer(
   )
 
   private val broker: Option[BrokerServer] = if 
(config.processRoles.contains(BrokerRole)) {
+    val brokerMetrics = BrokerServerMetrics(metrics)
+    val fatalFaultHandler = new ProcessExitingFaultHandler()
+    val metadataLoadingFaultHandler = new LoggingFaultHandler("metadata 
loading",
+        () => brokerMetrics.metadataLoadErrorCount.getAndIncrement())
+    val metadataApplyingFaultHandler = new LoggingFaultHandler("metadata 
application",
+      () => brokerMetrics.metadataApplyErrorCount.getAndIncrement())
     Some(new BrokerServer(
       config,
       metaProps,
       raftManager,
       time,
       metrics,
+      brokerMetrics,
       threadNamePrefix,
       offlineDirs,
-      controllerQuorumVotersFuture
+      controllerQuorumVotersFuture,
+      fatalFaultHandler,
+      metadataLoadingFaultHandler,
+      metadataApplyingFaultHandler
     ))
   } else {
     None
   }
 
   private val controller: Option[ControllerServer] = if 
(config.processRoles.contains(ControllerRole)) {
+    val controllerMetrics = new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)
+    val metadataFaultHandler = new LoggingFaultHandler("controller metadata",
+      () => controllerMetrics.incrementMetadataErrorCount())
+    val fatalFaultHandler = new ProcessExitingFaultHandler()
     Some(new ControllerServer(
       metaProps,
       config,
       raftManager,
       time,
       metrics,
+      controllerMetrics,
       threadNamePrefix,
       controllerQuorumVotersFuture,
       KafkaRaftServer.configSchema,
       raftManager.apiVersions,
       bootstrapMetadata,
-      new MetadataFaultHandler(),
-      new ProcessExitingFaultHandler(),
+      metadataFaultHandler,
+      fatalFaultHandler
     ))
   } else {
     None
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 3b79526a954..3984f467edd 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -19,13 +19,13 @@ package kafka.server.metadata
 import java.util
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import java.util.function.Consumer
-
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
 
 
@@ -40,7 +40,8 @@ class BrokerMetadataListener(
   threadNamePrefix: Option[String],
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
-  brokerMetrics: BrokerServerMetrics
+  brokerMetrics: BrokerServerMetrics,
+  metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
   private val logContext = new LogContext(s"[BrokerMetadataListener 
id=$brokerId] ")
   private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -109,11 +110,16 @@ class BrokerMetadataListener(
       extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
       val results = try {
-        val loadResults = loadBatches(_delta, reader, None, None, None)
+        val loadResults = loadBatches(_delta, reader, None, None, None, None)
         if (isDebugEnabled) {
           debug(s"Loaded new commits: $loadResults")
         }
         loadResults
+      } catch {
+        case e: Throwable =>
+          metadataLoadingFaultHandler.handleFault(s"Unable to load metadata 
commits " +
+            s"from the BatchReader starting at base offset 
${reader.baseOffset()}", e)
+          return
       } finally {
         reader.close()
       }
@@ -156,19 +162,26 @@ class BrokerMetadataListener(
   class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
     extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
+      val snapshotName = 
s"${reader.snapshotId().offset}-${reader.snapshotId().epoch}"
       try {
-        info(s"Loading snapshot 
${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
+        info(s"Loading snapshot ${snapshotName}")
         _delta = new MetadataDelta(_image) // Discard any previous deltas.
-        val loadResults = loadBatches(
-          _delta,
+        val loadResults = loadBatches(_delta,
           reader,
           Some(reader.lastContainedLogTimestamp),
           Some(reader.lastContainedLogOffset),
-          Some(reader.lastContainedLogEpoch)
-        )
-        _delta.finishSnapshot()
-        info(s"Loaded snapshot 
${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
-          s"$loadResults")
+          Some(reader.lastContainedLogEpoch),
+          Some(snapshotName))
+        try {
+          _delta.finishSnapshot()
+        } catch {
+          case e: Throwable => metadataLoadingFaultHandler.handleFault(
+              s"Error finishing snapshot ${snapshotName}", e)
+        }
+        info(s"Loaded snapshot ${snapshotName}: ${loadResults}")
+      } catch {
+        case t: Throwable => metadataLoadingFaultHandler.handleFault("Uncaught 
exception while " +
+          s"loading broker metadata from Metadata snapshot ${snapshotName}", t)
       } finally {
         reader.close()
       }
@@ -201,7 +214,8 @@ class BrokerMetadataListener(
     iterator: util.Iterator[Batch[ApiMessageAndVersion]],
     lastAppendTimestamp: Option[Long],
     lastCommittedOffset: Option[Long],
-    lastCommittedEpoch: Option[Int]
+    lastCommittedEpoch: Option[Int],
+    snapshotName: Option[String]
   ): BatchLoadResults = {
     val startTimeNs = time.nanoseconds()
     var numBatches = 0
@@ -220,12 +234,20 @@ class BrokerMetadataListener(
           trace(s"Metadata batch ${batch.lastOffset}: processing [${index + 
1}/${batch.records.size}]:" +
             s" ${messageAndVersion.message}")
         }
-
         _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + 
index)
-
-        delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
-        numRecords += 1
-        index += 1
+        try {
+          delta.replay(highestMetadataOffset, epoch, 
messageAndVersion.message())
+        } catch {
+          case e: Throwable => snapshotName match {
+            case None => metadataLoadingFaultHandler.handleFault(
+              s"Error replaying metadata log record at offset 
${_highestOffset}", e)
+            case Some(name) => metadataLoadingFaultHandler.handleFault(
+              s"Error replaying record ${index} from snapshot ${name} at 
offset ${_highestOffset}", e)
+          }
+        } finally {
+          numRecords += 1
+          index += 1
+        }
       }
       numBytes = numBytes + batch.sizeInBytes()
       metadataBatchSizeHist.update(batch.records().size())
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 212f188504e..0192bb4afcf 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, 
TopicsImage}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.fault.FaultHandler
 
 import scala.collection.mutable
 
@@ -94,15 +95,19 @@ object BrokerMetadataPublisher extends Logging {
   }
 }
 
-class BrokerMetadataPublisher(conf: KafkaConfig,
-                              metadataCache: KRaftMetadataCache,
-                              logManager: LogManager,
-                              replicaManager: ReplicaManager,
-                              groupCoordinator: GroupCoordinator,
-                              txnCoordinator: TransactionCoordinator,
-                              clientQuotaMetadataManager: 
ClientQuotaMetadataManager,
-                              dynamicConfigHandlers: Map[String, 
ConfigHandler],
-                              private val _authorizer: Option[Authorizer]) 
extends MetadataPublisher with Logging {
+class BrokerMetadataPublisher(
+  conf: KafkaConfig,
+  metadataCache: KRaftMetadataCache,
+  logManager: LogManager,
+  replicaManager: ReplicaManager,
+  groupCoordinator: GroupCoordinator,
+  txnCoordinator: TransactionCoordinator,
+  clientQuotaMetadataManager: ClientQuotaMetadataManager,
+  dynamicConfigHandlers: Map[String, ConfigHandler],
+  private val _authorizer: Option[Authorizer],
+  fatalFaultHandler: FaultHandler,
+  metadataPublishingFaultHandler: FaultHandler
+) extends MetadataPublisher with Logging {
   logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
 
   import BrokerMetadataPublisher._
@@ -125,8 +130,15 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
     val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
 
+    val deltaName = if (_firstPublish) {
+      s"initial MetadataDelta up to ${highestOffsetAndEpoch.offset}"
+    } else {
+      s"MetadataDelta up to ${highestOffsetAndEpoch.offset}"
+    }
     try {
-      trace(s"Publishing delta $delta with highest offset 
$highestOffsetAndEpoch")
+      if (isTraceEnabled) {
+        trace(s"Publishing delta $delta with highest offset 
$highestOffsetAndEpoch")
+      }
 
       // Publish the new metadata image to the metadata cache.
       metadataCache.setImage(newImage)
@@ -151,37 +163,50 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 
       // Apply topic deltas.
       Option(delta.topicsDelta()).foreach { topicsDelta =>
-        // Notify the replica manager about changes to topics.
-        replicaManager.applyDelta(topicsDelta, newImage)
-
-        // Update the group coordinator of local changes
-        updateCoordinator(
-          newImage,
-          delta,
-          Topic.GROUP_METADATA_TOPIC_NAME,
-          groupCoordinator.onElection,
-          groupCoordinator.onResignation
-        )
-
-        // Update the transaction coordinator of local changes
-        updateCoordinator(
-          newImage,
-          delta,
-          Topic.TRANSACTION_STATE_TOPIC_NAME,
-          txnCoordinator.onElection,
-          txnCoordinator.onResignation
-        )
-
-        // Notify the group coordinator about deleted topics.
-        val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
-        topicsDelta.deletedTopicIds().forEach { id =>
-          val topicImage = topicsDelta.image().getTopic(id)
-          topicImage.partitions().keySet().forEach {
-            id => deletedTopicPartitions += new 
TopicPartition(topicImage.name(), id)
-          }
+        try {
+          // Notify the replica manager about changes to topics.
+          replicaManager.applyDelta(topicsDelta, newImage)
+        } catch {
+          case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error applying topics " +
+            s"delta in ${deltaName}", t)
+        }
+        try {
+          // Update the group coordinator of local changes
+          updateCoordinator(newImage,
+            delta,
+            Topic.GROUP_METADATA_TOPIC_NAME,
+            groupCoordinator.onElection,
+            groupCoordinator.onResignation)
+        } catch {
+          case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating group " +
+            s"coordinator with local changes in ${deltaName}", t)
+        }
+        try {
+          // Update the transaction coordinator of local changes
+          updateCoordinator(newImage,
+            delta,
+            Topic.TRANSACTION_STATE_TOPIC_NAME,
+            txnCoordinator.onElection,
+            txnCoordinator.onResignation)
+        } catch {
+          case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating txn " +
+            s"coordinator with local changes in ${deltaName}", t)
         }
-        if (deletedTopicPartitions.nonEmpty) {
-          groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, 
RequestLocal.NoCaching)
+        try {
+          // Notify the group coordinator about deleted topics.
+          val deletedTopicPartitions = new 
mutable.ArrayBuffer[TopicPartition]()
+          topicsDelta.deletedTopicIds().forEach { id =>
+            val topicImage = topicsDelta.image().getTopic(id)
+            topicImage.partitions().keySet().forEach {
+              id => deletedTopicPartitions += new 
TopicPartition(topicImage.name(), id)
+            }
+          }
+          if (deletedTopicPartitions.nonEmpty) {
+            groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, 
RequestLocal.NoCaching)
+          }
+        } catch {
+          case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating group " +
+            s"coordinator with deleted partitions in ${deltaName}", t)
         }
       }
 
@@ -191,39 +216,62 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
           val props = newImage.configs().configProperties(resource)
           resource.`type`() match {
             case TOPIC =>
-              // Apply changes to a topic's dynamic configuration.
-              info(s"Updating topic ${resource.name()} with new configuration 
: " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Topic).
-                processConfigChanges(resource.name(), props)
+              try {
+                // Apply changes to a topic's dynamic configuration.
+                info(s"Updating topic ${resource.name()} with new 
configuration : " +
+                  toLoggableProps(resource, props).mkString(","))
+                dynamicConfigHandlers(ConfigType.Topic).
+                  processConfigChanges(resource.name(), props)
+              } catch {
+                case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating topic " +
+                  s"${resource.name()} with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
+                  s"in ${deltaName}", t)
+              }
             case BROKER =>
               if (resource.name().isEmpty) {
-                // Apply changes to "cluster configs" (also known as default 
BROKER configs).
-                // These are stored in KRaft with an empty name field.
-                info("Updating cluster configuration : " +
-                  toLoggableProps(resource, props).mkString(","))
-                dynamicConfigHandlers(ConfigType.Broker).
-                  processConfigChanges(ConfigEntityName.Default, props)
+                try {
+                  // Apply changes to "cluster configs" (also known as default 
BROKER configs).
+                  // These are stored in KRaft with an empty name field.
+                  info("Updating cluster configuration : " +
+                    toLoggableProps(resource, props).mkString(","))
+                  dynamicConfigHandlers(ConfigType.Broker).
+                    processConfigChanges(ConfigEntityName.Default, props)
+                } catch {
+                  case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating " +
+                    s"cluster with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
+                    s"in ${deltaName}", t)
+                }
               } else if (resource.name() == brokerId.toString) {
-                // Apply changes to this broker's dynamic configuration.
-                info(s"Updating broker $brokerId with new configuration : " +
-                  toLoggableProps(resource, props).mkString(","))
-                dynamicConfigHandlers(ConfigType.Broker).
-                  processConfigChanges(resource.name(), props)
-                // When applying a per broker config (not a cluster config), 
we also
-                // reload any associated file. For example, if the 
ssl.keystore is still
-                // set to /tmp/foo, we still want to reload /tmp/foo in case 
its contents
-                // have changed. This doesn't apply to topic configs or 
cluster configs.
-                reloadUpdatedFilesWithoutConfigChange(props)
+                try {
+                  // Apply changes to this broker's dynamic configuration.
+                  info(s"Updating broker $brokerId with new configuration : " +
+                    toLoggableProps(resource, props).mkString(","))
+                  dynamicConfigHandlers(ConfigType.Broker).
+                    processConfigChanges(resource.name(), props)
+                  // When applying a per broker config (not a cluster config), 
we also
+                  // reload any associated file. For example, if the 
ssl.keystore is still
+                  // set to /tmp/foo, we still want to reload /tmp/foo in case 
its contents
+                  // have changed. This doesn't apply to topic configs or 
cluster configs.
+                  reloadUpdatedFilesWithoutConfigChange(props)
+                } catch {
+                  case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating " +
+                    s"broker with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
+                    s"in ${deltaName}", t)
+                }
               }
             case _ => // nothing to do
           }
         }
       }
 
-      // Apply client quotas delta.
-      Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
-        clientQuotaMetadataManager.update(clientQuotasDelta)
+      try {
+        // Apply client quotas delta.
+        Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
+          clientQuotaMetadataManager.update(clientQuotasDelta)
+        }
+      } catch {
+        case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
updating client " +
+          s"quotas in ${deltaName}", t)
       }
 
       // Apply changes to ACLs. This needs to be handled carefully because 
while we are
@@ -235,20 +283,30 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       Option(delta.aclsDelta()).foreach( aclsDelta =>
         _authorizer match {
           case Some(authorizer: ClusterMetadataAuthorizer) => if 
(aclsDelta.isSnapshotDelta) {
-            // If the delta resulted from a snapshot load, we want to apply 
the new changes
-            // all at once using ClusterMetadataAuthorizer#loadSnapshot. If 
this is the
-            // first snapshot load, it will also complete the futures returned 
by
-           // Authorizer#start (which we wait for before processing RPCs).
-            authorizer.loadSnapshot(newImage.acls().acls())
+            try {
+              // If the delta resulted from a snapshot load, we want to apply 
the new changes
+              // all at once using ClusterMetadataAuthorizer#loadSnapshot. If 
this is the
+              // first snapshot load, it will also complete the futures 
returned by
+              // Authorizer#start (which we wait for before processing RPCs).
+              authorizer.loadSnapshot(newImage.acls().acls())
+            } catch {
+              case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error loading " +
+                s"authorizer snapshot in ${deltaName}", t)
+            }
           } else {
-            // Because the changes map is a LinkedHashMap, the deltas will be 
returned in
-            // the order they were performed.
-            aclsDelta.changes().entrySet().forEach(e =>
-              if (e.getValue.isPresent) {
-                authorizer.addAcl(e.getKey, e.getValue.get())
-              } else {
-                authorizer.removeAcl(e.getKey)
-              })
+            try {
+              // Because the changes map is a LinkedHashMap, the deltas will 
be returned in
+              // the order they were performed.
+              aclsDelta.changes().entrySet().forEach(e =>
+                if (e.getValue.isPresent) {
+                  authorizer.addAcl(e.getKey, e.getValue.get())
+                } else {
+                  authorizer.removeAcl(e.getKey)
+                })
+            } catch {
+              case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error loading " +
+                s"authorizer changes in ${deltaName}", t)
+            }
           }
           case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.
         })
@@ -258,8 +316,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       }
       publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
     } catch {
-      case t: Throwable => error(s"Error publishing broker metadata at 
$highestOffsetAndEpoch", t)
-        throw t
+      case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
+        s"publishing broker metadata from ${deltaName}", t)
     } finally {
       _firstPublish = false
     }
@@ -282,7 +340,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
    * @param resignation function to call on resignation; the first parameter 
is the partition id;
    *                    the second parameter is the leader epoch
    */
-  private def updateCoordinator(
+  def updateCoordinator(
     image: MetadataImage,
     delta: MetadataDelta,
     topicName: String,
@@ -317,38 +375,60 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   }
 
   private def initializeManagers(): Unit = {
-    // Start log manager, which will perform (potentially lengthy)
-    // recovery-from-unclean-shutdown if required.
-    logManager.startup(metadataCache.getAllTopics())
-
-    // Make the LogCleaner available for reconfiguration. We can't do this 
prior to this
-    // point because LogManager#startup creates the LogCleaner object, if
-    // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
-    
Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
-
-    // Start the replica manager.
-    replicaManager.startup()
-
-    // Start the group coordinator.
-    groupCoordinator.startup(() => metadataCache.numPartitions(
-      Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(conf.offsetsTopicPartitions))
-
-    // Start the transaction coordinator.
-    txnCoordinator.startup(() => metadataCache.numPartitions(
-      
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(conf.transactionTopicPartitions))
+    try {
+      // Start log manager, which will perform (potentially lengthy)
+      // recovery-from-unclean-shutdown if required.
+      logManager.startup(metadataCache.getAllTopics())
+
+      // Make the LogCleaner available for reconfiguration. We can't do this 
prior to this
+      // point because LogManager#startup creates the LogCleaner object, if
+      // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
+      
Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting 
LogManager", t)
+    }
+    try {
+      // Start the replica manager.
+      replicaManager.startup()
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting 
ReplicaManager", t)
+    }
+    try {
+      // Start the group coordinator.
+      groupCoordinator.startup(() => metadataCache.numPartitions(
+        
Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(conf.offsetsTopicPartitions))
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting 
GroupCoordinator", t)
+    }
+    try {
+      // Start the transaction coordinator.
+      txnCoordinator.startup(() => metadataCache.numPartitions(
+        
Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(conf.transactionTopicPartitions))
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting 
TransactionCoordinator", t)
+    }
   }
 
   private def finishInitializingReplicaManager(newImage: MetadataImage): Unit 
= {
-    // Delete log directories which we're not supposed to have, according to 
the
-    // latest metadata. This is only necessary to do when we're first starting 
up. If
-    // we have to load a snapshot later, these topics will appear in 
deletedTopicIds.
-    val strayPartitions = findStrayPartitions(brokerId, newImage.topics, 
logManager.allLogs)
-    if (strayPartitions.nonEmpty) {
-      replicaManager.deleteStrayReplicas(strayPartitions)
+    try {
+      // Delete log directories which we're not supposed to have, according to 
the
+      // latest metadata. This is only necessary to do when we're first 
starting up. If
+      // we have to load a snapshot later, these topics will appear in 
deletedTopicIds.
+      val strayPartitions = findStrayPartitions(brokerId, newImage.topics, 
logManager.allLogs)
+      if (strayPartitions.nonEmpty) {
+        replicaManager.deleteStrayReplicas(strayPartitions)
+      }
+    } catch {
+      case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
deleting stray " +
+        "partitions during startup", t)
     }
-
-    // Make sure that the high water mark checkpoint thread is running for the 
replica
-    // manager.
-    replicaManager.startHighWatermarkCheckPointThread()
-  }
+    try {
+      // Make sure that the high water mark checkpoint thread is running for 
the replica
+      // manager.
+      replicaManager.startHighWatermarkCheckPointThread()
+    } catch {
+      case t: Throwable => metadataPublishingFaultHandler.handleFault("Error 
starting high " +
+        "watermark checkpoint thread during startup", t)
+    }
+}
 }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 0db6f0071c4..3e68ae85f92 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -28,6 +28,8 @@ final class BrokerServerMetrics private (metrics: Metrics) 
extends AutoCloseable
 
   val lastAppliedRecordOffset: AtomicLong = new AtomicLong(0)
   val lastAppliedRecordTimestamp: AtomicLong = new AtomicLong(0)
+  val metadataLoadErrorCount: AtomicLong = new AtomicLong(0)
+  val metadataApplyErrorCount: AtomicLong = new AtomicLong(0)
 
   val lastAppliedRecordOffsetName = metrics.metricName(
     "last-applied-record-offset",
@@ -47,6 +49,18 @@ final class BrokerServerMetrics private (metrics: Metrics) 
extends AutoCloseable
     "The difference between now and the timestamp of the last record from the 
cluster metadata partition that was applied by the broker"
   )
 
+  val metadataLoadErrorCountName = metrics.metricName(
+    "metadata-load-error-count",
+    metricGroupName,
+    "The number of errors encountered by the BrokerMetadataListener while 
loading the metadata log and generating a new MetadataDelta based on it."
+  )
+
+  val metadataApplyErrorCountName = metrics.metricName(
+    "metadata-apply-error-count",
+    metricGroupName,
+    "The number of errors encountered by the BrokerMetadataPublisher while 
applying a new MetadataImage based on the latest MetadataDelta."
+  )
+
   addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
     lastAppliedRecordOffset.get
   }
@@ -59,11 +73,21 @@ final class BrokerServerMetrics private (metrics: Metrics) 
extends AutoCloseable
     now - lastAppliedRecordTimestamp.get
   }
 
+  addMetric(metrics, metadataLoadErrorCountName) { _ =>
+    metadataLoadErrorCount.get
+  }
+
+  addMetric(metrics, metadataApplyErrorCountName) { _ =>
+    metadataApplyErrorCount.get
+  }
+
   override def close(): Unit = {
     List(
       lastAppliedRecordOffsetName,
       lastAppliedRecordTimestampName,
-      lastAppliedRecordLagMsName
+      lastAppliedRecordLagMsName,
+      metadataLoadErrorCountName,
+      metadataApplyErrorCountName
     ).foreach(metrics.removeMetric)
   }
 }
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java 
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 42120324f5f..ecee13c4982 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -24,6 +24,7 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaRaftServer;
 import kafka.server.MetaProperties;
+import kafka.server.metadata.BrokerServerMetrics$;
 import kafka.tools.StorageTool;
 import kafka.utils.Logging;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -36,6 +37,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.BootstrapMetadata;
 import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.MockControllerMetrics;
 import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -128,6 +130,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return this;
         }
 
+        public Builder setMetadataFaultHandler(MockFaultHandler 
metadataFaultHandler) {
+            this.metadataFaultHandler = metadataFaultHandler;
+            return this;
+        }
+
         public KafkaClusterTestKit build() throws Exception {
             Map<Integer, ControllerServer> controllers = new HashMap<>();
             Map<Integer, BrokerServer> brokers = new HashMap<>();
@@ -189,6 +196,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         raftManager,
                         Time.SYSTEM,
                         new Metrics(),
+                        new MockControllerMetrics(),
                         Option.apply(threadNamePrefix),
                         connectFutureManager.future,
                         KafkaRaftServer.configSchema(),
@@ -245,15 +253,20 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
                             Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
                         raftManagers.put(node.id(), raftManager);
                     }
+                    Metrics metrics = new Metrics();
                     BrokerServer broker = new BrokerServer(
                         config,
                         nodes.brokerProperties(node.id()),
                         raftManager,
                         Time.SYSTEM,
-                        new Metrics(),
+                        metrics,
+                        BrokerServerMetrics$.MODULE$.apply(metrics),
                         Option.apply(threadNamePrefix),
                         
JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
-                        connectFutureManager.future
+                        connectFutureManager.future,
+                        fatalFaultHandler,
+                        metadataFaultHandler,
+                        metadataFaultHandler
                     );
                     brokers.put(node.id(), broker);
                 }
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 9894df9c5f7..c4ca966f9ab 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -24,6 +24,7 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.CompletableFuture
 import javax.security.auth.login.Configuration
 import kafka.raft.KafkaRaftManager
+import kafka.server.metadata.BrokerServerMetrics
 import kafka.tools.StorageTool
 import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
 import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
@@ -32,11 +33,12 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Exit, Time}
-import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.controller.{BootstrapMetadata, QuorumControllerMetrics}
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.MockFaultHandler
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
@@ -82,26 +84,34 @@ class ZooKeeperQuorumImplementation(
   }
 }
 
-class KRaftQuorumImplementation(val raftManager: 
KafkaRaftManager[ApiMessageAndVersion],
-                                val controllerServer: ControllerServer,
-                                val metadataDir: File,
-                                val controllerQuorumVotersFuture: 
CompletableFuture[util.Map[Integer, AddressSpec]],
-                                val clusterId: String,
-                                val log: Logging) extends QuorumImplementation 
{
+class KRaftQuorumImplementation(
+  val raftManager: KafkaRaftManager[ApiMessageAndVersion],
+  val controllerServer: ControllerServer,
+  val metadataDir: File,
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]],
+  val clusterId: String,
+  val log: Logging,
+  val faultHandler: FaultHandler
+) extends QuorumImplementation {
   override def createBroker(
     config: KafkaConfig,
     time: Time,
     startup: Boolean,
     threadNamePrefix: Option[String],
   ): KafkaBroker = {
+    val metrics = new Metrics()
     val broker = new BrokerServer(config = config,
       metaProps = new MetaProperties(clusterId, config.nodeId),
       raftManager = raftManager,
       time = time,
-      metrics = new Metrics(),
+      metrics = metrics,
+      brokerMetrics = BrokerServerMetrics(metrics),
       threadNamePrefix = Some("Broker%02d_".format(config.nodeId)),
       initialOfflineDirs = Seq(),
-      controllerQuorumVotersFuture = controllerQuorumVotersFuture)
+      controllerQuorumVotersFuture = controllerQuorumVotersFuture,
+      fatalFaultHandler = faultHandler,
+      metadataLoadingFaultHandler = faultHandler,
+      metadataPublishingFaultHandler = faultHandler)
     if (startup) broker.startup()
     broker
   }
@@ -306,6 +316,7 @@ abstract class QuorumTestHarness extends Logging {
         raftManager = raftManager,
         time = Time.SYSTEM,
         metrics = controllerMetrics,
+        controllerMetrics = new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), Time.SYSTEM),
         threadNamePrefix = Option(threadNamePrefix),
         controllerQuorumVotersFuture = controllerQuorumVotersFuture,
         configSchema = KafkaRaftServer.configSchema,
@@ -336,7 +347,8 @@ abstract class QuorumTestHarness extends Logging {
       metadataDir,
       controllerQuorumVotersFuture,
       metaProperties.clusterId,
-      this)
+      this,
+      faultHandler)
   }
 
   private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = {
diff --git 
a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala 
b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
index df114ef59e5..ea2b439c166 100644
--- a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
+++ b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
@@ -37,12 +37,14 @@ final class BrokerServerMetricsTest {
     val expectedMetrics = Set(
       new MetricName("last-applied-record-offset", expectedGroup, "", 
Collections.emptyMap()),
       new MetricName("last-applied-record-timestamp", expectedGroup, "", 
Collections.emptyMap()),
-      new MetricName("last-applied-record-lag-ms", expectedGroup, "", 
Collections.emptyMap())
+      new MetricName("last-applied-record-lag-ms", expectedGroup, "", 
Collections.emptyMap()),
+      new MetricName("metadata-load-error-count", expectedGroup, "", 
Collections.emptyMap()),
+      new MetricName("metadata-apply-error-count", expectedGroup, "", 
Collections.emptyMap())
     )
      
     TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
       val metricsMap = metrics.metrics().asScala.filter{ case (name, _) => 
name.group == expectedGroup }
-      assertEquals(3, metricsMap.size)
+      assertEquals(expectedMetrics.size, metricsMap.size)
       metricsMap.foreach { case (name, metric) =>
         assertTrue(expectedMetrics.contains(name))
       }
@@ -85,4 +87,36 @@ final class BrokerServerMetricsTest {
       assertEquals(time.milliseconds - timestamp, 
lagMetric.metricValue.asInstanceOf[Long])
     }
   }
+
+  @Test
+  def testMetadataLoadErrorCount(): Unit = {
+    val time = new MockTime()
+    val metrics = new Metrics(time)
+    TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+      val metadataLoadErrorCountMetric = 
metrics.metrics().get(brokerMetrics.metadataLoadErrorCountName)
+
+      assertEquals(0L, 
metadataLoadErrorCountMetric.metricValue.asInstanceOf[Long])
+
+      // Update metric value and check
+      val errorCount = 100
+      brokerMetrics.metadataLoadErrorCount.set(errorCount)
+      assertEquals(errorCount, 
metadataLoadErrorCountMetric.metricValue.asInstanceOf[Long])
+    }
+  }
+
+  @Test
+  def testMetadataApplyErrorCount(): Unit = {
+    val time = new MockTime()
+    val metrics = new Metrics(time)
+    TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+      val metadataApplyErrorCountMetric = 
metrics.metrics().get(brokerMetrics.metadataApplyErrorCountName)
+
+      assertEquals(0L, 
metadataApplyErrorCountMetric.metricValue.asInstanceOf[Long])
+
+      // Update metric value and check
+      val errorCount = 100
+      brokerMetrics.metadataApplyErrorCount.set(errorCount)
+      assertEquals(errorCount, 
metadataApplyErrorCountMetric.metricValue.asInstanceOf[Long])
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index b21fe877f20..29de3c0f242 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -233,16 +233,21 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
   @ValueSource(strings = Array("kraft"))
   def testKRaftControllerMetrics(quorum: String): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
-
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs"), 1)
+    Set(
+      "kafka.controller:type=KafkaController,name=ActiveControllerCount",
+      "kafka.controller:type=KafkaController,name=GlobalPartitionCount",
+      "kafka.controller:type=KafkaController,name=GlobalTopicCount",
+      "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
+      "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+      "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
+      "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+      "kafka.controller:type=KafkaController,name=MetadataErrorCount",
+      "kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
+      
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
+    ).foreach(expected => {
+      assertEquals(1, 
metrics.keySet.asScala.count(_.getMBeanName.equals(expected)),
+        s"Unable to find ${expected}")
+    })
   }
 
   /**
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 6de448f2802..6c8c2599d29 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -27,12 +27,20 @@ import org.apache.kafka.common.{Endpoint, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, 
VersionRange}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.MockFaultHandler
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{AfterEach, Test}
 
 import scala.jdk.CollectionConverters._
 
 class BrokerMetadataListenerTest {
+  private val metadataLoadingFaultHandler = new MockFaultHandler("metadata 
loading")
+
+  @AfterEach
+  def verifyNoFaults(): Unit = {
+    metadataLoadingFaultHandler.maybeRethrowFirstException()
+  }
+
   private def newBrokerMetadataListener(
     metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
     snapshotter: Option[MetadataSnapshotter] = None,
@@ -44,7 +52,8 @@ class BrokerMetadataListenerTest {
       threadNamePrefix = None,
       maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
       snapshotter = snapshotter,
-      brokerMetrics = metrics
+      brokerMetrics = metrics,
+      metadataLoadingFaultHandler = metadataLoadingFaultHandler
     )
   }
 
@@ -77,6 +86,8 @@ class BrokerMetadataListenerTest {
       assertEquals(100L, listener.highestMetadataOffset)
       assertEquals(0L, metrics.lastAppliedRecordOffset.get)
       assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
+      assertEquals(0L, metrics.metadataLoadErrorCount.get)
+      assertEquals(0L, metrics.metadataApplyErrorCount.get)
 
       val fencedTimestamp = 500L
       val fencedLastOffset = 200L
@@ -110,6 +121,8 @@ class BrokerMetadataListenerTest {
 
       assertEquals(fencedLastOffset, metrics.lastAppliedRecordOffset.get)
       assertEquals(fencedTimestamp, metrics.lastAppliedRecordTimestamp.get)
+      assertEquals(0L, metrics.metadataLoadErrorCount.get)
+      assertEquals(0L, metrics.metadataApplyErrorCount.get)
     } finally {
       listener.close()
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 6742530ef51..652b8b3a0c2 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,17 +17,16 @@
 
 package unit.kafka.server.metadata
 
-import java.util.Collections.{singleton, singletonMap}
+import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.Properties
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-
 import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.server.{BrokerServer, KafkaConfig}
 import kafka.server.metadata.BrokerMetadataPublisher
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, 
NewTopic}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
 import org.apache.kafka.common.utils.Exit
@@ -35,10 +34,12 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
+import org.mockito.Mockito.doThrow
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
@@ -176,6 +177,25 @@ class BrokerMetadataPublisherTest {
     new TopicsImage(idsMap.asJava, namesMap.asJava)
   }
 
+  private def newMockPublisher(
+    broker: BrokerServer,
+    errorHandler: FaultHandler = new MockFaultHandler("publisher")
+  ): BrokerMetadataPublisher = {
+    Mockito.spy(new BrokerMetadataPublisher(
+      conf = broker.config,
+      metadataCache = broker.metadataCache,
+      logManager = broker.logManager,
+      replicaManager = broker.replicaManager,
+      groupCoordinator = broker.groupCoordinator,
+      txnCoordinator = broker.transactionCoordinator,
+      clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
+      dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
+      _authorizer = Option.empty,
+      errorHandler,
+      errorHandler
+    ))
+  }
+
   @Test
   def testReloadUpdatedFilesWithoutConfigChange(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
@@ -187,17 +207,7 @@ class BrokerMetadataPublisherTest {
       cluster.startup()
       cluster.waitForReadyBrokers()
       val broker = cluster.brokers().values().iterator().next()
-      val publisher = Mockito.spy(new BrokerMetadataPublisher(
-        conf = broker.config,
-        metadataCache = broker.metadataCache,
-        logManager = broker.logManager,
-        replicaManager = broker.replicaManager,
-        groupCoordinator = broker.groupCoordinator,
-        txnCoordinator = broker.transactionCoordinator,
-        clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
-        dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
-        _authorizer = Option.empty
-      ))
+      val publisher = newMockPublisher(broker)
       val numTimesReloadCalled = new AtomicInteger(0)
       
Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
         thenAnswer(new Answer[Unit]() {
@@ -227,4 +237,39 @@ class BrokerMetadataPublisherTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testExceptionInUpdateCoordinator(): Unit = {
+    val errorHandler = new MockFaultHandler("publisher")
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).
+      setMetadataFaultHandler(errorHandler).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val broker = cluster.brokers().values().iterator().next()
+      TestUtils.retry(60000) {
+        assertNotNull(broker.metadataPublisher)
+      }
+      val publisher = Mockito.spy(broker.metadataPublisher)
+      doThrow(new RuntimeException("injected 
failure")).when(publisher).updateCoordinator(any(), any(), any(), any(), any())
+      broker.metadataListener.alterPublisher(publisher).get()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.createTopics(singletonList(new NewTopic("foo", 1, 
1.toShort))).all().get()
+      } finally {
+        admin.close()
+      }
+      TestUtils.retry(60000) {
+        assertTrue(Option(errorHandler.firstException()).
+          flatMap(e => 
Option(e.getMessage())).getOrElse("(none)").contains("injected failure"))
+      }
+    } finally {
+      errorHandler.setIgnore(true)
+      cluster.close()
+    }
+  }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 6b470664d6e..ff243aebfcb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -51,6 +51,10 @@ public interface ControllerMetrics extends AutoCloseable {
 
     int preferredReplicaImbalanceCount();
 
+    void incrementMetadataErrorCount();
+
+    int metadataErrorCount();
+
     void setLastAppliedRecordOffset(long offset);
 
     long lastAppliedRecordOffset();
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index a4cc1d92cba..ef87248f134 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -765,7 +765,7 @@ public final class QuorumController implements Controller {
                             "%d of %d record(s) in the batch following last 
writeOffset %d.",
                             message.message().getClass().getSimpleName(), i, 
result.records().size(),
                             writeOffset);
-                        fatalFaultHandler.handleFault(failureMessage, e);
+                        throw fatalFaultHandler.handleFault(failureMessage, e);
                     }
                     i++;
                 }
@@ -889,7 +889,7 @@ public final class QuorumController implements Controller {
                                             "controller, which was %d of %d 
record(s) in the batch with baseOffset %d.",
                                             
message.message().getClass().getSimpleName(), i, messages.size(),
                                             batch.baseOffset());
-                                    
metadataFaultHandler.handleFault(failureMessage, e);
+                                    throw 
metadataFaultHandler.handleFault(failureMessage, e);
                                 }
                                 i++;
                             }
@@ -910,7 +910,7 @@ public final class QuorumController implements Controller {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", 
reader.snapshotId()), () -> {
                 try {
                     if (isActiveController()) {
-                        fatalFaultHandler.handleFault(String.format("Asked to 
load snapshot " +
+                        throw 
fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
                             "(%s) when it is the active controller (%d)", 
reader.snapshotId(),
                             curClaimEpoch));
                     }
@@ -945,7 +945,7 @@ public final class QuorumController implements Controller {
                                         "%d record(s) in the batch with 
baseOffset %d.",
                                         
message.message().getClass().getSimpleName(), reader.snapshotId(),
                                         i, messages.size(), 
batch.baseOffset());
-                                
metadataFaultHandler.handleFault(failureMessage, e);
+                                throw 
metadataFaultHandler.handleFault(failureMessage, e);
                             }
                             i++;
                         }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index 5abf0d97706..b96a687b0f3 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -26,6 +26,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 public final class QuorumControllerMetrics implements ControllerMetrics {
@@ -47,6 +48,8 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
         "KafkaController", "OfflinePartitionsCount");
     private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = 
getMetricName(
         "KafkaController", "PreferredReplicaImbalanceCount");
+    private final static MetricName METADATA_ERROR_COUNT = getMetricName(
+            "KafkaController", "MetadataErrorCount");
     private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
         "KafkaController", "LastAppliedRecordOffset");
     private final static MetricName LAST_COMMITTED_RECORD_OFFSET = 
getMetricName(
@@ -64,6 +67,7 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
     private volatile int globalPartitionCount;
     private volatile int offlinePartitionCount;
     private volatile int preferredReplicaImbalanceCount;
+    private volatile AtomicInteger metadataErrorCount;
     private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
@@ -74,6 +78,7 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
     private final Gauge<Integer> globalTopicCountGauge;
     private final Gauge<Integer> offlinePartitionCountGauge;
     private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
+    private final Gauge<Integer> metadataErrorCountGauge;
     private final Gauge<Long> lastAppliedRecordOffsetGauge;
     private final Gauge<Long> lastCommittedRecordOffsetGauge;
     private final Gauge<Long> lastAppliedRecordTimestampGauge;
@@ -93,6 +98,7 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
         this.globalPartitionCount = 0;
         this.offlinePartitionCount = 0;
         this.preferredReplicaImbalanceCount = 0;
+        this.metadataErrorCount = new AtomicInteger(0);
         this.activeControllerCount = 
registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
             @Override
             public Integer value() {
@@ -137,6 +143,12 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
                 return preferredReplicaImbalanceCount;
             }
         });
+        this.metadataErrorCountGauge = registry.newGauge(METADATA_ERROR_COUNT, 
new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return metadataErrorCount.get();
+            }
+        });
         lastAppliedRecordOffsetGauge = 
registry.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
             @Override
             public Long value() {
@@ -242,6 +254,15 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
         return this.preferredReplicaImbalanceCount;
     }
 
+    @Override
+    public void incrementMetadataErrorCount() {
+        this.metadataErrorCount.getAndIncrement();
+    }
+
+    @Override
+    public int metadataErrorCount() {
+        return this.metadataErrorCount.get();
+    }
     @Override
     public void setLastAppliedRecordOffset(long offset) {
         lastAppliedRecordOffset.set(offset);
@@ -276,12 +297,15 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
     public void close() {
         Arrays.asList(
             ACTIVE_CONTROLLER_COUNT,
+            FENCED_BROKER_COUNT,
+            ACTIVE_BROKER_COUNT,
             EVENT_QUEUE_TIME_MS,
             EVENT_QUEUE_PROCESSING_TIME_MS,
             GLOBAL_TOPIC_COUNT,
             GLOBAL_PARTITION_COUNT,
             OFFLINE_PARTITION_COUNT,
             PREFERRED_REPLICA_IMBALANCE_COUNT,
+            METADATA_ERROR_COUNT,
             LAST_APPLIED_RECORD_OFFSET,
             LAST_COMMITTED_RECORD_OFFSET,
             LAST_APPLIED_RECORD_TIMESTAMP,
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
deleted file mode 100644
index c57ce46fb35..00000000000
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.fault;
-
-
-/**
- * A fault that we encountered while we replayed cluster metadata.
- */
-public class MetadataFaultException extends RuntimeException {
-    public MetadataFaultException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public MetadataFaultException(String message) {
-        super(message);
-    }
-}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
deleted file mode 100644
index e9f71b80e67..00000000000
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.fault;
-
-import org.apache.kafka.server.fault.FaultHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Handles faults in Kafka metadata management.
- */
-public class MetadataFaultHandler implements FaultHandler {
-    private static final Logger log = 
LoggerFactory.getLogger(MetadataFaultHandler.class);
-
-    @Override
-    public void handleFault(String failureMessage, Throwable cause) {
-        FaultHandler.logFailureMessage(log, failureMessage, cause);
-        throw new MetadataFaultException("Encountered metadata fault: " + 
failureMessage, cause);
-    }
-}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 5991fcc34f3..ca13d90ddea 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.controller;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 public final class MockControllerMetrics implements ControllerMetrics {
     private volatile boolean active = false;
     private volatile int fencedBrokers = 0;
@@ -25,6 +27,7 @@ public final class MockControllerMetrics implements 
ControllerMetrics {
     private volatile int partitions = 0;
     private volatile int offlinePartitions = 0;
     private volatile int preferredReplicaImbalances = 0;
+    private volatile AtomicInteger metadataErrors = new AtomicInteger(0);
     private volatile long lastAppliedRecordOffset = 0;
     private volatile long lastCommittedRecordOffset = 0;
     private volatile long lastAppliedRecordTimestamp = 0;
@@ -111,6 +114,16 @@ public final class MockControllerMetrics implements 
ControllerMetrics {
         return this.preferredReplicaImbalances;
     }
 
+    @Override
+    public void incrementMetadataErrorCount() {
+        this.metadataErrors.getAndIncrement();
+    }
+
+    @Override
+    public int metadataErrorCount() {
+        return this.metadataErrors.get();
+    }
+
     @Override
     public void setLastAppliedRecordOffset(long offset) {
         lastAppliedRecordOffset = offset;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
index 2ab99955943..400b860197e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
@@ -36,10 +36,13 @@ public class QuorumControllerMetricsTest {
         String expectedType = "KafkaController";
         Set<String> expectedMetricNames = Utils.mkSet(
             "ActiveControllerCount",
+            "FencedBrokerCount",
+            "ActiveBrokerCount",
             "GlobalTopicCount",
             "GlobalPartitionCount",
             "OfflinePartitionsCount",
             "PreferredReplicaImbalanceCount",
+            "MetadataErrorCount",
             "LastAppliedRecordLagMs",
             "LastAppliedRecordOffset",
             "LastAppliedRecordTimestamp",
@@ -125,6 +128,25 @@ public class QuorumControllerMetricsTest {
         }
     }
 
+    @Test
+    public void testMetadataErrorCount() {
+        MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
+        try {
+            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry, time)) {
+                @SuppressWarnings("unchecked")
+                Gauge<Integer> metadataErrorCount = (Gauge<Integer>) registry
+                        .allMetrics()
+                        .get(metricName("KafkaController", 
"MetadataErrorCount"));
+                assertEquals(0, metadataErrorCount.value());
+                quorumControllerMetrics.incrementMetadataErrorCount();
+                assertEquals(1, metadataErrorCount.value());
+            }
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     private static void assertMetricsCreatedAndRemovedUponClose(String 
expectedType, Set<String> expectedMetricNames) {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
@@ -151,10 +173,18 @@ public class QuorumControllerMetricsTest {
     }
 
     private static void assertMetricsCreated(MetricsRegistry registry, 
Set<String> expectedMetricNames, String expectedType) {
+        assertEquals(registry.allMetrics().keySet().stream()
+                .filter(k -> k.getType() == expectedType).count(),
+                expectedMetricNames.size());
         expectedMetricNames.forEach(expectedName -> {
             MetricName expectMetricName = metricName(expectedType, 
expectedName);
             assertTrue(registry.allMetrics().containsKey(expectMetricName), 
"Missing metric: " + expectMetricName);
         });
+        registry.allMetrics().forEach((actualMetricName, actualMetric) -> {
+            if (actualMetricName.getType() == expectedType) {
+                
assertTrue(expectedMetricNames.contains(actualMetricName.getName()), 
"Unexpected metric: " + actualMetricName);
+            }
+        });
     }
 
     private static void assertMetricsRemoved(MetricsRegistry registry, 
Set<String> expectedMetricNames, String expectedType) {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
index 4c03eacc32f..5efc145ea94 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.server.fault;
 
-import org.slf4j.Logger;
-
 
 /**
  * Handle a server fault.
@@ -28,9 +26,11 @@ public interface FaultHandler {
      * Handle a fault.
      *
      * @param failureMessage        The failure message to log.
+     *
+     * @return                      The fault exception.
      */
-    default void handleFault(String failureMessage) {
-        handleFault(failureMessage, null);
+    default RuntimeException handleFault(String failureMessage) {
+        return handleFault(failureMessage, null);
     }
 
     /**
@@ -38,21 +38,8 @@ public interface FaultHandler {
      *
      * @param failureMessage        The failure message to log.
      * @param cause                 The exception that caused the problem, or 
null.
-     */
-    void handleFault(String failureMessage, Throwable cause);
-
-    /**
-     * Log a failure message about a fault.
      *
-     * @param log               The log4j logger.
-     * @param failureMessage    The failure message.
-     * @param cause             The exception which caused the failure, or 
null.
+     * @return                      The fault exception.
      */
-    static void logFailureMessage(Logger log, String failureMessage, Throwable 
cause) {
-        if (cause == null) {
-            log.error("Encountered fatal fault: {}", failureMessage);
-        } else {
-            log.error("Encountered fatal fault: {}", failureMessage, cause);
-        }
-    }
+    RuntimeException handleFault(String failureMessage, Throwable cause);
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
similarity index 83%
rename from 
server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
rename to 
server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
index ef9b11bdeb5..ec3b7dc4b0c 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
@@ -19,10 +19,10 @@ package org.apache.kafka.server.fault;
 
 
 /**
- * An exception thrown by MockFaultHandler.
+ * An exception thrown by a fault handler.
  */
-public class MockFaultHandlerException extends RuntimeException {
-    public MockFaultHandlerException(String failureMessage, Throwable cause) {
+public class FaultHandlerException extends RuntimeException {
+    public FaultHandlerException(String failureMessage, Throwable cause) {
         super(failureMessage, cause);
         // If a cause exception was provided, set our the stack trace its 
stack trace. This is
         // useful in junit tests where a limited number of stack frames are 
printed, and usually
@@ -32,7 +32,7 @@ public class MockFaultHandlerException extends 
RuntimeException {
         }
     }
 
-    public MockFaultHandlerException(String failureMessage) {
+    public FaultHandlerException(String failureMessage) {
         this(failureMessage, null);
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/fault/LoggingFaultHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/fault/LoggingFaultHandler.java
new file mode 100644
index 00000000000..9242cef4eb9
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/fault/LoggingFaultHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A fault handler which logs an error message and executes a runnable.
+ */
+public class LoggingFaultHandler implements FaultHandler {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggingFaultHandler.class);
+    private final String type;
+    private final Runnable action;
+
+    public LoggingFaultHandler(
+        String type,
+        Runnable action
+    ) {
+        this.type = type;
+        this.action = action;
+    }
+
+    @Override
+    public RuntimeException handleFault(String failureMessage, Throwable 
cause) {
+        if (cause == null) {
+            log.error("Encountered {} fault: {}", type, failureMessage);
+        } else {
+            log.error("Encountered {} fault: {}", type, failureMessage, cause);
+        }
+        try {
+            action.run();
+        } catch (Throwable e) {
+            log.error("Failed to run LoggingFaultHandler action.", e);
+        }
+        return new FaultHandlerException(failureMessage, cause);
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
index e3b9f25a3be..b7c0d241a2a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
@@ -30,8 +30,13 @@ public class ProcessExitingFaultHandler implements 
FaultHandler {
     private static final Logger log = 
LoggerFactory.getLogger(ProcessExitingFaultHandler.class);
 
     @Override
-    public void handleFault(String failureMessage, Throwable cause) {
-        FaultHandler.logFailureMessage(log, failureMessage, cause);
+    public RuntimeException handleFault(String failureMessage, Throwable 
cause) {
+        if (cause == null) {
+            log.error("Encountered fatal fault: {}", failureMessage);
+        } else {
+            log.error("Encountered fatal fault: {}", failureMessage, cause);
+        }
         Exit.exit(1);
+        return null;
     }
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/fault/LoggingFaultHandlerTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/fault/LoggingFaultHandlerTest.java
new file mode 100644
index 00000000000..1a11098a21b
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/fault/LoggingFaultHandlerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+/**
+ * Tests LoggingFaultHandler
+ */
+public class LoggingFaultHandlerTest {
+    /**
+     * Test handling faults with and without exceptions.
+     */
+    @Test
+    public void testHandleFault() {
+        AtomicInteger counter = new AtomicInteger(0);
+        LoggingFaultHandler handler = new LoggingFaultHandler("test", () -> {
+            counter.incrementAndGet();
+        });
+        handler.handleFault("uh oh");
+        assertEquals(1, counter.get());
+        handler.handleFault("uh oh", new RuntimeException("yikes"));
+        assertEquals(2, counter.get());
+    }
+
+    /**
+     * Test handling an exception in the action callback.
+     */
+    @Test
+    public void testHandleExceptionInAction() {
+        LoggingFaultHandler handler = new LoggingFaultHandler("test", () -> {
+            throw new RuntimeException("action failed");
+        });
+        handler.handleFault("uh oh"); // should not throw
+        handler.handleFault("uh oh", new RuntimeException("yikes")); // should 
not throw
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
 
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
index 39b3ed07847..e49f2bdc6c2 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
@@ -29,7 +29,7 @@ public class MockFaultHandler implements FaultHandler {
     private static final Logger log = 
LoggerFactory.getLogger(MockFaultHandler.class);
 
     private final String name;
-    private MockFaultHandlerException firstException = null;
+    private FaultHandlerException firstException = null;
     private boolean ignore = false;
 
     public MockFaultHandler(String name) {
@@ -37,16 +37,20 @@ public class MockFaultHandler implements FaultHandler {
     }
 
     @Override
-    public synchronized void handleFault(String failureMessage, Throwable 
cause) {
-        FaultHandler.logFailureMessage(log, failureMessage, cause);
-        MockFaultHandlerException e = (cause == null) ?
-                new MockFaultHandlerException(name + ": " + failureMessage) :
-                new MockFaultHandlerException(name + ": " + failureMessage +
+    public synchronized RuntimeException handleFault(String failureMessage, 
Throwable cause) {
+        if (cause == null) {
+            log.error("Encountered {} fault: {}", name, failureMessage);
+        } else {
+            log.error("Encountered {} fault: {}", name, failureMessage, cause);
+        }
+        FaultHandlerException e = (cause == null) ?
+                new FaultHandlerException(name + ": " + failureMessage) :
+                new FaultHandlerException(name + ": " + failureMessage +
                         ": " + cause.getMessage(), cause);
         if (firstException == null) {
             firstException = e;
         }
-        throw e;
+        return firstException;
     }
 
     public synchronized void maybeRethrowFirstException() {
@@ -55,7 +59,7 @@ public class MockFaultHandler implements FaultHandler {
         }
     }
 
-    public synchronized MockFaultHandlerException firstException() {
+    public synchronized FaultHandlerException firstException() {
         return firstException;
     }
 

Reply via email to