This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 555744da70 KAFKA-14124: improve quorum controller fault handling
(#12447)
555744da70 is described below
commit 555744da7040f1ad91decc3cf2b813285af60aa2
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Aug 4 22:49:45 2022 -0700
KAFKA-14124: improve quorum controller fault handling (#12447)
Before trying to commit a batch of records to the __cluster_metadata log,
the active controller
should try to apply them to its current in-memory state. If this
application process fails, the
active controller process should exit, allowing another node to take
leadership. This will prevent
most bad metadata records from ending up in the log and help to surface
errors during testing.
Similarly, if the active controller attempts to renounce leadership, and
the renunciation process
itself fails, the process should exit. This will help avoid bugs where the
active controller
continues in an undefined state.
In contrast, standby controllers that experience metadata application
errors should continue on, in
order to avoid a scenario where a bad record brings down the whole
controller cluster. The
intended effect of these changes is to make it harder to commit a bad
record to the metadata log,
but to continue to ride out the bad record as well as possible if such a
record does get committed.
This PR introduces the FaultHandler interface to implement these concepts.
In junit tests, we use a
FaultHandler implementation which does not exit the process. This allows us
to avoid terminating
the gradle test runner, which would be very disruptive. It also allows us
to ensure that the test
surfaces these exceptions, which we previously were not doing (the mock
fault handler stores the
exception).
In addition to the above, this PR fixes a bug where RaftClient#resign was
not being called from the
renounce() function. This bug could have resulted in the raft layer not
being informed of an active
controller resigning.
Reviewers: David Arthur <[email protected]>
---
build.gradle | 2 +
checkstyle/import-control-core.xml | 1 +
checkstyle/import-control.xml | 4 +
checkstyle/suppressions.xml | 2 +
.../main/scala/kafka/server/ControllerServer.scala | 10 +-
.../main/scala/kafka/server/KafkaRaftServer.scala | 6 +-
.../java/kafka/testkit/KafkaClusterTestKit.java | 36 +-
.../kafka/server/QuorumTestHarness.scala | 6 +
.../apache/kafka/controller/QuorumController.java | 382 ++++++++++++---------
.../metadata/fault/MetadataFaultException.java | 32 ++
.../kafka/metadata/fault/MetadataFaultHandler.java | 36 ++
.../kafka/controller/QuorumControllerTest.java | 25 ++
.../kafka/controller/QuorumControllerTestEnv.java | 15 +
.../apache/kafka/server/fault/FaultHandler.java | 58 ++++
.../server/fault/ProcessExitingFaultHandler.java | 37 ++
.../kafka/server/fault/MockFaultHandler.java | 65 ++++
.../server/fault/MockFaultHandlerException.java | 38 ++
17 files changed, 586 insertions(+), 169 deletions(-)
diff --git a/build.gradle b/build.gradle
index 54068f2977..e1f773075a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -897,6 +897,7 @@ project(':core') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':metadata').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
+ testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
@@ -1179,6 +1180,7 @@ project(':metadata') {
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
+ testImplementation project(':server-common').sourceSets.test.output
generator project(':generator')
}
diff --git a/checkstyle/import-control-core.xml
b/checkstyle/import-control-core.xml
index 28b325b093..4042cba402 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -54,6 +54,7 @@
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.fault" />
</subpackage>
<subpackage name="tools">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 211d23ff60..4b07a26cba 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -232,6 +232,7 @@
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.snapshot" />
@@ -276,6 +277,9 @@
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
</subpackage>
+ <subpackage name="fault">
+ <allow pkg="org.apache.kafka.server.fault" />
+ </subpackage>
</subpackage>
<subpackage name="metalog">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f6ca0d02fe..bec3da1637 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,8 @@
<suppress
checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity"
files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
+ <suppress checks="MethodLength"
+ files="(KafkaClusterTestKit).java"/>
<!-- Clients -->
<suppress id="dontUseSystemExit"
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index cff88d2b6b..212c092e1a 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -21,7 +21,6 @@ import java.util
import java.util.OptionalLong
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
-
import kafka.cluster.Broker.ServerInfo
import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
import kafka.network.{DataPlaneAcceptor, SocketServer}
@@ -46,6 +45,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -65,7 +65,9 @@ class ControllerServer(
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer,
AddressSpec]],
val configSchema: KafkaConfigSchema,
val raftApiVersions: ApiVersions,
- val bootstrapMetadata: BootstrapMetadata
+ val bootstrapMetadata: BootstrapMetadata,
+ val metadataFaultHandler: FaultHandler,
+ val fatalFaultHandler: FaultHandler,
) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
@@ -204,7 +206,9 @@ class ControllerServer(
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator()).
setStaticConfig(config.originals).
- setBootstrapMetadata(bootstrapMetadata)
+ setBootstrapMetadata(bootstrapMetadata).
+ setMetadataFaultHandler(metadataFaultHandler).
+ setFatalFaultHandler(fatalFaultHandler)
}
authorizer match {
case Some(a: ClusterMetadataAuthorizer) =>
controllerBuilder.setAuthorizer(a)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 07a3118372..e7cf8f8f1f 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -29,9 +29,11 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.metadata.fault.MetadataFaultHandler
import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import java.nio.file.Paths
@@ -106,7 +108,9 @@ class KafkaRaftServer(
controllerQuorumVotersFuture,
KafkaRaftServer.configSchema,
raftManager.apiVersions,
- bootstrapMetadata
+ bootstrapMetadata,
+ new MetadataFaultHandler(),
+ new ProcessExitingFaultHandler(),
))
} else {
None
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index c961d71bbe..42120324f5 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -40,6 +40,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
public static class Builder {
private TestKitNodes nodes;
private Map<String, String> configProps = new HashMap<>();
+ private MockFaultHandler metadataFaultHandler = new
MockFaultHandler("metadataFaultHandler");
+ private MockFaultHandler fatalFaultHandler = new
MockFaultHandler("fatalFaultHandler");
public Builder(TestKitNodes nodes) {
this.nodes = nodes;
@@ -190,7 +193,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
connectFutureManager.future,
KafkaRaftServer.configSchema(),
raftManager.apiVersions(),
- bootstrapMetadata
+ bootstrapMetadata,
+ metadataFaultHandler,
+ fatalFaultHandler
);
controllers.put(node.id(), controller);
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@@ -273,7 +278,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
throw e;
}
return new KafkaClusterTestKit(executorService, nodes, controllers,
- brokers, raftManagers, connectFutureManager, baseDirectory);
+ brokers, raftManagers, connectFutureManager, baseDirectory,
+ metadataFaultHandler, fatalFaultHandler);
}
private String listeners(int node) {
@@ -314,14 +320,20 @@ public class KafkaClusterTestKit implements AutoCloseable
{
private final Map<Integer, KafkaRaftManager<ApiMessageAndVersion>>
raftManagers;
private final ControllerQuorumVotersFutureManager
controllerQuorumVotersFutureManager;
private final File baseDirectory;
-
- private KafkaClusterTestKit(ExecutorService executorService,
- TestKitNodes nodes,
- Map<Integer, ControllerServer> controllers,
- Map<Integer, BrokerServer> brokers,
- Map<Integer,
KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
- ControllerQuorumVotersFutureManager
controllerQuorumVotersFutureManager,
- File baseDirectory) {
+ private final MockFaultHandler metadataFaultHandler;
+ private final MockFaultHandler fatalFaultHandler;
+
+ private KafkaClusterTestKit(
+ ExecutorService executorService,
+ TestKitNodes nodes,
+ Map<Integer, ControllerServer> controllers,
+ Map<Integer, BrokerServer> brokers,
+ Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
+ ControllerQuorumVotersFutureManager
controllerQuorumVotersFutureManager,
+ File baseDirectory,
+ MockFaultHandler metadataFaultHandler,
+ MockFaultHandler fatalFaultHandler
+ ) {
this.executorService = executorService;
this.nodes = nodes;
this.controllers = controllers;
@@ -329,6 +341,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.raftManagers = raftManagers;
this.controllerQuorumVotersFutureManager =
controllerQuorumVotersFutureManager;
this.baseDirectory = baseDirectory;
+ this.metadataFaultHandler = metadataFaultHandler;
+ this.fatalFaultHandler = fatalFaultHandler;
}
public void format() throws Exception {
@@ -520,6 +534,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.MINUTES);
}
+ metadataFaultHandler.maybeRethrowFirstException();
+ fatalFaultHandler.maybeRethrowFirstException();
}
private void waitForAllFutures(List<Entry<String, Future<?>>>
futureEntries)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index a2393cdccb..9894df9c5f 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.controller.BootstrapMetadata
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.MockFaultHandler
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
@@ -188,6 +189,8 @@ abstract class QuorumTestHarness extends Logging {
}
}
+ val faultHandler = new MockFaultHandler("quorumTestHarnessFaultHandler")
+
// Note: according to the junit documentation: "JUnit Jupiter does not
guarantee the execution
// order of multiple @BeforeEach methods that are declared within a single
test class or test
// interface." Therefore, if you have things you would like to do before
each test case runs, it
@@ -308,6 +311,8 @@ abstract class QuorumTestHarness extends Logging {
configSchema = KafkaRaftServer.configSchema,
raftApiVersions = raftManager.apiVersions,
bootstrapMetadata = BootstrapMetadata.create(metadataVersion,
bootstrapRecords.asJava),
+ metadataFaultHandler = faultHandler,
+ fatalFaultHandler = faultHandler,
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e)
=> {
if (e != null) {
@@ -374,6 +379,7 @@ abstract class QuorumTestHarness extends Logging {
}
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
Configuration.setConfiguration(null)
+ faultHandler.maybeRethrowFirstException()
}
// Trigger session expiry by reusing the session id in another client
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0290e0040c..a4cc1d92cb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -91,6 +91,7 @@ import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
@@ -149,6 +150,8 @@ public final class QuorumController implements Controller {
static public class Builder {
private final int nodeId;
private final String clusterId;
+ private FaultHandler fatalFaultHandler = null;
+ private FaultHandler metadataFaultHandler = null;
private Time time = Time.SYSTEM;
private String threadNamePrefix = null;
private LogContext logContext = null;
@@ -175,6 +178,16 @@ public final class QuorumController implements Controller {
this.clusterId = clusterId;
}
+ public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
+ this.fatalFaultHandler = fatalFaultHandler;
+ return this;
+ }
+
+ public Builder setMetadataFaultHandler(FaultHandler
metadataFaultHandler) {
+ this.metadataFaultHandler = metadataFaultHandler;
+ return this;
+ }
+
public int nodeId() {
return nodeId;
}
@@ -287,6 +300,10 @@ public final class QuorumController implements Controller {
throw new IllegalStateException("You must specify an initial
metadata.version using the kafka-storage tool.");
} else if (quorumFeatures == null) {
throw new IllegalStateException("You must specify the quorum
features");
+ } else if (fatalFaultHandler == null) {
+ throw new IllegalStateException("You must specify a fatal
fault handler.");
+ } else if (metadataFaultHandler == null) {
+ throw new IllegalStateException("You must specify a metadata
fault handler.");
}
if (threadNamePrefix == null) {
@@ -304,6 +321,8 @@ public final class QuorumController implements Controller {
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix
+ "QuorumController");
return new QuorumController(
+ fatalFaultHandler,
+ metadataFaultHandler,
logContext,
nodeId,
clusterId,
@@ -426,12 +445,18 @@ public final class QuorumController implements Controller
{
exception.getClass().getSimpleName(), deltaUs);
return exception;
}
- log.warn("{}: failed with unknown server exception {} at epoch {} in
{} us. " +
- "Renouncing leadership and reverting to the last committed
offset {}.",
- name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
- lastCommittedOffset, exception);
- raftClient.resign(curClaimEpoch);
- renounce();
+ if (isActiveController()) {
+ log.warn("{}: failed with unknown server exception {} at epoch {}
in {} us. " +
+ "Renouncing leadership and reverting to the last committed
offset {}.",
+ name, exception.getClass().getSimpleName(), curClaimEpoch,
deltaUs,
+ lastCommittedOffset, exception);
+ renounce();
+ } else {
+ log.warn("{}: failed with unknown server exception {} in {} us. "
+
+ "The controller is already in standby mode.",
+ name, exception.getClass().getSimpleName(), deltaUs,
+ exception);
+ }
return new UnknownServerException(exception);
}
@@ -702,7 +727,7 @@ public final class QuorumController implements Controller {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now -
eventCreatedTimeNs));
int controllerEpoch = curClaimEpoch;
- if (controllerEpoch == -1) {
+ if (!isActiveController()) {
throw newNotControllerException();
}
startProcessingTimeNs = OptionalLong.of(now);
@@ -728,9 +753,26 @@ public final class QuorumController implements Controller {
"reaches offset {}", this, resultAndOffset.offset());
}
} else {
- // If the operation returned a batch of records, those records
need to be
- // written before we can return our result to the user. Here,
we hand off
- // the batch of records to the raft client. They will be
written out
+ // Start by trying to apply the record to our in-memory state.
This should always
+ // succeed; if it does not, that's a fatal error. It is
important to do this before
+ // scheduling the record for Raft replication.
+ int i = 1;
+ for (ApiMessageAndVersion message : result.records()) {
+ try {
+ replay(message.message(), Optional.empty());
+ } catch (Throwable e) {
+ String failureMessage = String.format("Unable to apply
%s record, which was " +
+ "%d of %d record(s) in the batch following last
writeOffset %d.",
+ message.message().getClass().getSimpleName(), i,
result.records().size(),
+ writeOffset);
+ fatalFaultHandler.handleFault(failureMessage, e);
+ }
+ i++;
+ }
+
+ // If the operation returned a batch of records, and those
records could be applied,
+ // they need to be written before we can return our result to
the user. Here, we
+ // hand off the batch of records to the raft client. They
will be written out
// asynchronously.
final long offset;
if (result.isAtomic()) {
@@ -741,9 +783,6 @@ public final class QuorumController implements Controller {
op.processBatchEndOffset(offset);
updateWriteOffset(offset);
resultAndOffset = ControllerResultAndOffset.of(offset, result);
- for (ApiMessageAndVersion message : result.records()) {
- replay(message.message(), Optional.empty(), offset);
- }
snapshotRegistry.getOrCreateSnapshot(offset);
log.debug("Read-write operation {} will be completed when the
log " +
@@ -789,9 +828,9 @@ public final class QuorumController implements Controller {
return event.future();
}
- private <T> CompletableFuture<T> appendWriteEvent(String name,
- OptionalLong deadlineNs,
-
ControllerWriteOperation<T> op) {
+ <T> CompletableFuture<T> appendWriteEvent(String name,
+ OptionalLong deadlineNs,
+ ControllerWriteOperation<T> op) {
ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
if (deadlineNs.isPresent()) {
queue.appendWithDeadline(deadlineNs.getAsLong(), event);
@@ -841,11 +880,20 @@ public final class QuorumController implements Controller
{
"offset {} and epoch {}.", offset,
epoch);
}
}
- for (ApiMessageAndVersion messageAndVersion :
messages) {
- replay(messageAndVersion.message(),
Optional.empty(), offset);
+ int i = 1;
+ for (ApiMessageAndVersion message : messages) {
+ try {
+ replay(message.message(),
Optional.empty());
+ } catch (Throwable e) {
+ String failureMessage =
String.format("Unable to apply %s record on standby " +
+ "controller, which was %d of %d
record(s) in the batch with baseOffset %d.",
+
message.message().getClass().getSimpleName(), i, messages.size(),
+ batch.baseOffset());
+
metadataFaultHandler.handleFault(failureMessage, e);
+ }
+ i++;
}
}
-
updateLastCommittedState(offset, epoch,
batch.appendTimestamp());
processedRecordsSize += batch.sizeInBytes();
}
@@ -862,13 +910,9 @@ public final class QuorumController implements Controller {
appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]",
reader.snapshotId()), () -> {
try {
if (isActiveController()) {
- throw new IllegalStateException(
- String.format(
- "Asked to load snapshot (%s) when it is the
active controller (%d)",
- reader.snapshotId(),
- curClaimEpoch
- )
- );
+ fatalFaultHandler.handleFault(String.format("Asked to
load snapshot " +
+ "(%s) when it is the active controller (%d)",
reader.snapshotId(),
+ curClaimEpoch));
}
log.info("Starting to replay snapshot ({}), from last
commit offset ({}) and epoch ({})",
reader.snapshotId(), lastCommittedOffset,
lastCommittedEpoch);
@@ -882,26 +926,28 @@ public final class QuorumController implements Controller
{
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
- log.trace(
- "Replaying snapshot ({}) batch with last
offset of {}: {}",
- reader.snapshotId(),
- offset,
- messages
- .stream()
- .map(ApiMessageAndVersion::toString)
- .collect(Collectors.joining(", "))
- );
+ log.trace("Replaying snapshot ({}) batch with
last offset of {}: {}",
+ reader.snapshotId(), offset,
messages.stream().map(ApiMessageAndVersion::toString).
+ collect(Collectors.joining(", ")));
} else {
- log.debug(
- "Replaying snapshot ({}) batch with last
offset of {}",
- reader.snapshotId(),
- offset
- );
+ log.debug("Replaying snapshot ({}) batch with
last offset of {}",
+ reader.snapshotId(), offset);
}
}
- for (ApiMessageAndVersion messageAndVersion :
messages) {
- replay(messageAndVersion.message(),
Optional.of(reader.snapshotId()), offset);
+ int i = 1;
+ for (ApiMessageAndVersion message : messages) {
+ try {
+ replay(message.message(),
Optional.of(reader.snapshotId()));
+ } catch (Throwable e) {
+ String failureMessage = String.format("Unable
to apply %s record " +
+ "from snapshot %s on standby
controller, which was %d of " +
+ "%d record(s) in the batch with
baseOffset %d.",
+
message.message().getClass().getSimpleName(), reader.snapshotId(),
+ i, messages.size(),
batch.baseOffset());
+
metadataFaultHandler.handleFault(failureMessage, e);
+ }
+ i++;
}
}
updateLastCommittedState(
@@ -968,10 +1014,14 @@ public final class QuorumController implements
Controller {
if (exception != null) {
log.error("Failed to bootstrap metadata.",
exception);
appendRaftEvent("bootstrapMetadata[" +
curClaimEpoch + "]", () -> {
- log.warn("Renouncing the leadership at
oldEpoch {} since we could not bootstrap " +
- "metadata. Reverting to last
committed offset {}.",
- curClaimEpoch, lastCommittedOffset);
- renounce();
+ if (isActiveController()) {
+ log.warn("Renouncing the leadership at
oldEpoch {} since we could not bootstrap " +
+ "metadata. Reverting
to last committed offset {}.",
+ curClaimEpoch,
lastCommittedOffset);
+ renounce();
+ } else {
+ log.warn("Unable to bootstrap metadata
on standby controller.");
+ }
});
}
});
@@ -998,10 +1048,12 @@ public final class QuorumController implements
Controller {
});
} else if (isActiveController()) {
appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () ->
{
- log.warn("Renouncing the leadership at oldEpoch {} due to
a metadata " +
- "log event. Reverting to last committed offset
{}.", curClaimEpoch,
- lastCommittedOffset);
- renounce();
+ if (isActiveController()) {
+ log.warn("Renouncing the leadership at oldEpoch {} due
to a metadata " +
+ "log event. Reverting to last committed offset
{}.", curClaimEpoch,
+ lastCommittedOffset);
+ renounce();
+ }
});
}
}
@@ -1078,26 +1130,34 @@ public final class QuorumController implements
Controller {
}
private void renounce() {
- curClaimEpoch = -1;
- controllerMetrics.setActive(false);
- purgatory.failAll(newNotControllerException());
-
- if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
- newBytesSinceLastSnapshot = 0;
- snapshotRegistry.revertToSnapshot(lastCommittedOffset);
- authorizer.ifPresent(a ->
a.loadSnapshot(aclControlManager.idToAcl()));
- } else {
- resetState();
- raftClient.unregister(metaLogListener);
- metaLogListener = new QuorumMetaLogListener();
- raftClient.register(metaLogListener);
+ try {
+ if (curClaimEpoch == -1) {
+ throw new RuntimeException("Cannot renounce leadership because
we are not the " +
+ "current leader.");
+ }
+ raftClient.resign(curClaimEpoch);
+ curClaimEpoch = -1;
+ controllerMetrics.setActive(false);
+ purgatory.failAll(newNotControllerException());
+
+ if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+ newBytesSinceLastSnapshot = 0;
+ snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+ authorizer.ifPresent(a ->
a.loadSnapshot(aclControlManager.idToAcl()));
+ } else {
+ resetState();
+ raftClient.unregister(metaLogListener);
+ metaLogListener = new QuorumMetaLogListener();
+ raftClient.register(metaLogListener);
+ }
+ updateWriteOffset(-1);
+ clusterControl.deactivate();
+ cancelMaybeFenceReplicas();
+ cancelMaybeBalancePartitionLeaders();
+ cancelNextWriteNoOpRecord();
+ } catch (Throwable e) {
+ fatalFaultHandler.handleFault("exception while renouncing
leadership", e);
}
-
- updateWriteOffset(-1);
- clusterControl.deactivate();
- cancelMaybeFenceReplicas();
- cancelMaybeBalancePartitionLeaders();
- cancelNextWriteNoOpRecord();
}
private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
@@ -1246,70 +1306,60 @@ public final class QuorumController implements
Controller {
}
@SuppressWarnings("unchecked")
- private void replay(ApiMessage message, Optional<OffsetAndEpoch>
snapshotId, long offset) {
- try {
- MetadataRecordType type =
MetadataRecordType.fromId(message.apiKey());
- switch (type) {
- case REGISTER_BROKER_RECORD:
- clusterControl.replay((RegisterBrokerRecord) message);
- break;
- case UNREGISTER_BROKER_RECORD:
- clusterControl.replay((UnregisterBrokerRecord) message);
- break;
- case TOPIC_RECORD:
- replicationControl.replay((TopicRecord) message);
- break;
- case PARTITION_RECORD:
- replicationControl.replay((PartitionRecord) message);
- break;
- case CONFIG_RECORD:
- configurationControl.replay((ConfigRecord) message);
- break;
- case PARTITION_CHANGE_RECORD:
- replicationControl.replay((PartitionChangeRecord) message);
- break;
- case FENCE_BROKER_RECORD:
- clusterControl.replay((FenceBrokerRecord) message);
- break;
- case UNFENCE_BROKER_RECORD:
- clusterControl.replay((UnfenceBrokerRecord) message);
- break;
- case REMOVE_TOPIC_RECORD:
- replicationControl.replay((RemoveTopicRecord) message);
- break;
- case FEATURE_LEVEL_RECORD:
- featureControl.replay((FeatureLevelRecord) message);
- handleFeatureControlChange();
- break;
- case CLIENT_QUOTA_RECORD:
- clientQuotaControlManager.replay((ClientQuotaRecord)
message);
- break;
- case PRODUCER_IDS_RECORD:
- producerIdControlManager.replay((ProducerIdsRecord)
message);
- break;
- case BROKER_REGISTRATION_CHANGE_RECORD:
- clusterControl.replay((BrokerRegistrationChangeRecord)
message);
- break;
- case ACCESS_CONTROL_ENTRY_RECORD:
- aclControlManager.replay((AccessControlEntryRecord)
message, snapshotId);
- break;
- case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
- aclControlManager.replay((RemoveAccessControlEntryRecord)
message, snapshotId);
- break;
- case NO_OP_RECORD:
- // NoOpRecord is an empty record and doesn't need to be
replayed
- break;
- default:
- throw new RuntimeException("Unhandled record type " +
type);
- }
- } catch (Exception e) {
- if (snapshotId.isPresent()) {
- log.error("Error replaying record {} from snapshot {} at last
offset {}.",
- message.toString(), snapshotId.get(), offset, e);
- } else {
- log.error("Error replaying record {} at last offset {}.",
- message.toString(), offset, e);
- }
+ private void replay(ApiMessage message, Optional<OffsetAndEpoch>
snapshotId) {
+ MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+ switch (type) {
+ case REGISTER_BROKER_RECORD:
+ clusterControl.replay((RegisterBrokerRecord) message);
+ break;
+ case UNREGISTER_BROKER_RECORD:
+ clusterControl.replay((UnregisterBrokerRecord) message);
+ break;
+ case TOPIC_RECORD:
+ replicationControl.replay((TopicRecord) message);
+ break;
+ case PARTITION_RECORD:
+ replicationControl.replay((PartitionRecord) message);
+ break;
+ case CONFIG_RECORD:
+ configurationControl.replay((ConfigRecord) message);
+ break;
+ case PARTITION_CHANGE_RECORD:
+ replicationControl.replay((PartitionChangeRecord) message);
+ break;
+ case FENCE_BROKER_RECORD:
+ clusterControl.replay((FenceBrokerRecord) message);
+ break;
+ case UNFENCE_BROKER_RECORD:
+ clusterControl.replay((UnfenceBrokerRecord) message);
+ break;
+ case REMOVE_TOPIC_RECORD:
+ replicationControl.replay((RemoveTopicRecord) message);
+ break;
+ case FEATURE_LEVEL_RECORD:
+ featureControl.replay((FeatureLevelRecord) message);
+ handleFeatureControlChange();
+ break;
+ case CLIENT_QUOTA_RECORD:
+ clientQuotaControlManager.replay((ClientQuotaRecord) message);
+ break;
+ case PRODUCER_IDS_RECORD:
+ producerIdControlManager.replay((ProducerIdsRecord) message);
+ break;
+ case BROKER_REGISTRATION_CHANGE_RECORD:
+ clusterControl.replay((BrokerRegistrationChangeRecord)
message);
+ break;
+ case ACCESS_CONTROL_ENTRY_RECORD:
+ aclControlManager.replay((AccessControlEntryRecord) message,
snapshotId);
+ break;
+ case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+ aclControlManager.replay((RemoveAccessControlEntryRecord)
message, snapshotId);
+ break;
+ case NO_OP_RECORD:
+ // NoOpRecord is an empty record and doesn't need to be
replayed
+ break;
+ default:
+ throw new RuntimeException("Unhandled record type " + type);
}
}
@@ -1344,8 +1394,24 @@ public final class QuorumController implements
Controller {
updateLastCommittedState(-1, -1, -1);
}
+ /**
+ * Handles faults that should normally be fatal to the process.
+ */
+ private final FaultHandler fatalFaultHandler;
+
+ /**
+ * Handles faults in metadata handling that are normally not fatal.
+ */
+ private final FaultHandler metadataFaultHandler;
+
+ /**
+ * The slf4j log context, used to create new loggers.
+ */
private final LogContext logContext;
+ /**
+ * The slf4j logger.
+ */
private final Logger log;
/**
@@ -1530,28 +1596,34 @@ public final class QuorumController implements
Controller {
private final BootstrapMetadata bootstrapMetadata;
- private QuorumController(LogContext logContext,
- int nodeId,
- String clusterId,
- KafkaEventQueue queue,
- Time time,
- KafkaConfigSchema configSchema,
- RaftClient<ApiMessageAndVersion> raftClient,
- QuorumFeatures quorumFeatures,
- short defaultReplicationFactor,
- int defaultNumPartitions,
- ReplicaPlacer replicaPlacer,
- long snapshotMaxNewRecordBytes,
- OptionalLong leaderImbalanceCheckIntervalNs,
- OptionalLong maxIdleIntervalNs,
- long sessionTimeoutNs,
- ControllerMetrics controllerMetrics,
- Optional<CreateTopicPolicy> createTopicPolicy,
- Optional<AlterConfigPolicy> alterConfigPolicy,
- ConfigurationValidator configurationValidator,
- Optional<ClusterMetadataAuthorizer> authorizer,
- Map<String, Object> staticConfig,
- BootstrapMetadata bootstrapMetadata) {
+ private QuorumController(
+ FaultHandler fatalFaultHandler,
+ FaultHandler metadataFaultHandler,
+ LogContext logContext,
+ int nodeId,
+ String clusterId,
+ KafkaEventQueue queue,
+ Time time,
+ KafkaConfigSchema configSchema,
+ RaftClient<ApiMessageAndVersion> raftClient,
+ QuorumFeatures quorumFeatures,
+ short defaultReplicationFactor,
+ int defaultNumPartitions,
+ ReplicaPlacer replicaPlacer,
+ long snapshotMaxNewRecordBytes,
+ OptionalLong leaderImbalanceCheckIntervalNs,
+ OptionalLong maxIdleIntervalNs,
+ long sessionTimeoutNs,
+ ControllerMetrics controllerMetrics,
+ Optional<CreateTopicPolicy> createTopicPolicy,
+ Optional<AlterConfigPolicy> alterConfigPolicy,
+ ConfigurationValidator configurationValidator,
+ Optional<ClusterMetadataAuthorizer> authorizer,
+ Map<String, Object> staticConfig,
+ BootstrapMetadata bootstrapMetadata
+ ) {
+ this.fatalFaultHandler = fatalFaultHandler;
+ this.metadataFaultHandler = metadataFaultHandler;
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
new file mode 100644
index 0000000000..c57ce46fb3
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+
+/**
+ * A fault that we encountered while we replayed cluster metadata.
+ */
+public class MetadataFaultException extends RuntimeException {
+ public MetadataFaultException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetadataFaultException(String message) {
+ super(message);
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
new file mode 100644
index 0000000000..e9f71b80e6
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles faults in Kafka metadata management.
+ */
+public class MetadataFaultHandler implements FaultHandler {
+ private static final Logger log =
LoggerFactory.getLogger(MetadataFaultHandler.class);
+
+ @Override
+ public void handleFault(String failureMessage, Throwable cause) {
+ FaultHandler.logFailureMessage(log, failureMessage, cause);
+ throw new MetadataFaultException("Encountered metadata fault: " +
failureMessage, cause);
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a62b1f682f..5e395cebcb 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigResource;
@@ -1181,6 +1182,30 @@ public class QuorumControllerTest {
}
}
+ @Test
+ public void testFatalMetadataReplayErrorOnActive() throws Throwable {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3,
Optional.empty())) {
+ try (QuorumControllerTestEnv controlEnv = new
QuorumControllerTestEnv(logEnv, b -> {
+ })) {
+ QuorumController active = controlEnv.activeController();
+ CompletableFuture<Void> future =
active.appendWriteEvent("errorEvent",
+ OptionalLong.empty(), () -> {
+ return
ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
+ new ConfigRecord().
+ setName(null).
+ setResourceName(null).
+ setResourceType((byte) 255).
+ setValue(null), (short) 0)), null);
+ });
+ assertThrows(ExecutionException.class, () -> future.get());
+ assertEquals(NullPointerException.class,
+
controlEnv.fatalFaultHandler().firstException().getCause().getClass());
+ controlEnv.fatalFaultHandler().setIgnore(true);
+ controlEnv.metadataFaultHandler().setIgnore(true);
+ }
+ }
+ }
+
private static void
assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
for (int i = 0; i < authorizers.size(); i++) {
assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 4cc45a9774..40dd21c88d 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -34,6 +34,7 @@ import org.apache.kafka.controller.QuorumController.Builder;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ public class QuorumControllerTestEnv implements AutoCloseable
{
private final List<QuorumController> controllers;
private final LocalLogManagerTestEnv logEnv;
+ private final MockFaultHandler fatalFaultHandler = new
MockFaultHandler("fatalFaultHandler");
+ private final MockFaultHandler metadataFaultHandler = new
MockFaultHandler("metadataFaultHandler");
public QuorumControllerTestEnv(
LocalLogManagerTestEnv logEnv,
@@ -84,6 +87,8 @@ public class QuorumControllerTestEnv implements AutoCloseable
{
sessionTimeoutMillis.ifPresent(timeout -> {
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout,
TimeUnit.MILLISECONDS));
});
+ builder.setFatalFaultHandler(fatalFaultHandler);
+ builder.setMetadataFaultHandler(metadataFaultHandler);
builderConsumer.accept(builder);
this.controllers.add(builder.build());
}
@@ -117,6 +122,14 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
return controllers;
}
+ public MockFaultHandler fatalFaultHandler() {
+ return fatalFaultHandler;
+ }
+
+ public MockFaultHandler metadataFaultHandler() {
+ return metadataFaultHandler;
+ }
+
@Override
public void close() throws InterruptedException {
for (QuorumController controller : controllers) {
@@ -125,5 +138,7 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
for (QuorumController controller : controllers) {
controller.close();
}
+ fatalFaultHandler.maybeRethrowFirstException();
+ metadataFaultHandler.maybeRethrowFirstException();
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
new file mode 100644
index 0000000000..4c03eacc32
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+
+
+/**
+ * Handle a server fault.
+ */
+public interface FaultHandler {
+ /**
+ * Handle a fault.
+ *
+ * @param failureMessage The failure message to log.
+ */
+ default void handleFault(String failureMessage) {
+ handleFault(failureMessage, null);
+ }
+
+ /**
+ * Handle a fault.
+ *
+ * @param failureMessage The failure message to log.
+ * @param cause The exception that caused the problem, or
null.
+ */
+ void handleFault(String failureMessage, Throwable cause);
+
+ /**
+ * Log a failure message about a fault.
+ *
+ * @param log The log4j logger.
+ * @param failureMessage The failure message.
+ * @param cause The exception which caused the failure, or
null.
+ */
+ static void logFailureMessage(Logger log, String failureMessage, Throwable
cause) {
+ if (cause == null) {
+ log.error("Encountered fatal fault: {}", failureMessage);
+ } else {
+ log.error("Encountered fatal fault: {}", failureMessage, cause);
+ }
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
new file mode 100644
index 0000000000..e3b9f25a3b
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.common.utils.Exit;
+
+
+/**
+ * This is a fault handler which exits the Java process.
+ */
+public class ProcessExitingFaultHandler implements FaultHandler {
+ private static final Logger log =
LoggerFactory.getLogger(ProcessExitingFaultHandler.class);
+
+ @Override
+ public void handleFault(String failureMessage, Throwable cause) {
+ FaultHandler.logFailureMessage(log, failureMessage, cause);
+ Exit.exit(1);
+ }
+}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
new file mode 100644
index 0000000000..39b3ed0784
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a fault handler suitable for use in JUnit tests. It will store the
result of the first
+ * call to handleFault that was made.
+ */
+public class MockFaultHandler implements FaultHandler {
+ private static final Logger log =
LoggerFactory.getLogger(MockFaultHandler.class);
+
+ private final String name;
+ private MockFaultHandlerException firstException = null;
+ private boolean ignore = false;
+
+ public MockFaultHandler(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public synchronized void handleFault(String failureMessage, Throwable
cause) {
+ FaultHandler.logFailureMessage(log, failureMessage, cause);
+ MockFaultHandlerException e = (cause == null) ?
+ new MockFaultHandlerException(name + ": " + failureMessage) :
+ new MockFaultHandlerException(name + ": " + failureMessage +
+ ": " + cause.getMessage(), cause);
+ if (firstException == null) {
+ firstException = e;
+ }
+ throw e;
+ }
+
+ public synchronized void maybeRethrowFirstException() {
+ if (firstException != null && !ignore) {
+ throw firstException;
+ }
+ }
+
+ public synchronized MockFaultHandlerException firstException() {
+ return firstException;
+ }
+
+ public synchronized void setIgnore(boolean ignore) {
+ this.ignore = ignore;
+ }
+}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
new file mode 100644
index 0000000000..ef9b11bdeb
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+
+/**
+ * An exception thrown by MockFaultHandler.
+ */
+public class MockFaultHandlerException extends RuntimeException {
+ public MockFaultHandlerException(String failureMessage, Throwable cause) {
+ super(failureMessage, cause);
+ // If a cause exception was provided, set our the stack trace its
stack trace. This is
+ // useful in junit tests where a limited number of stack frames are
printed, and usually
+ // the stack frames of cause exceptions get trimmed.
+ if (cause != null) {
+ setStackTrace(cause.getStackTrace());
+ }
+ }
+
+ public MockFaultHandlerException(String failureMessage) {
+ this(failureMessage, null);
+ }
+}