[
https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420251#comment-15420251
]
Alexey Ozeritskiy commented on KAFKA-3924:
------------------------------------------
IMHO the simplest way to solve the problem is to execute System.exit
asyncroniously:
{code}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ef602e4..ed00a73 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -175,10 +175,13 @@ class ReplicaFetcherThread(name: String,
if (!LogConfig.fromProps(brokerConfig.originals,
AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
ConfigType.Topic,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss
does not unexpectedly occur.
- fatal("Exiting because log truncation is not allowed for partition
%s,".format(topicAndPartition) +
+ val msg = "Exiting because log truncation is not allowed for partition
%s,".format(topicAndPartition) +
" Current leader %d's latest offset %d is less than replica %d's
latest offset %d"
- .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId,
replica.logEndOffset.messageOffset))
- System.exit(1)
+ .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId,
replica.logEndOffset.messageOffset)
+ fatal(msg)
+
+ replicaMgr.scheduler.schedule("exit", () => System.exit(1))
+ throw new Exception(msg)
}
warn("Replica %d for partition %s reset its fetch offset from %d to
current leader %d's latest offset %d"
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2b97783..6e6539b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -105,7 +105,7 @@ class ReplicaManager(val config: KafkaConfig,
time: Time,
jTime: JTime,
val zkUtils: ZkUtils,
- scheduler: Scheduler,
+ val scheduler: Scheduler,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
threadNamePrefix: Option[String] = None) extends Logging
with KafkaMetricsGroup {
{code}
> Data loss due to halting when LEO is larger than leader's LEO
> -------------------------------------------------------------
>
> Key: KAFKA-3924
> URL: https://issues.apache.org/jira/browse/KAFKA-3924
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.10.0.0
> Reporter: Maysam Yabandeh
> Fix For: 0.10.0.1
>
> Attachments: deadlock-stack
>
>
> Currently the follower broker panics when its LEO is larger than its leader's
> LEO, and assuming that this is an impossible state to reach, halts the
> process to prevent any further damage.
> {code}
> if (leaderEndOffset < replica.logEndOffset.messageOffset) {
> // Prior to truncating the follower's log, ensure that doing so is not
> disallowed by the configuration for unclean leader election.
> // This situation could only happen if the unclean election
> configuration for a topic changes while a replica is down. Otherwise,
> // we should never encounter this situation since a non-ISR leader
> cannot be elected if disallowed by the broker configuration.
> if (!LogConfig.fromProps(brokerConfig.originals,
> AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
> ConfigType.Topic,
> topicAndPartition.topic)).uncleanLeaderElectionEnable) {
> // Log a fatal error and shutdown the broker to ensure that data loss
> does not unexpectedly occur.
> fatal("...")
> Runtime.getRuntime.halt(1)
> }
> {code}
> Firstly this assumption is invalid and there are legitimate cases (examples
> below) that this case could actually occur. Secondly halt results into the
> broker losing its un-flushed data, and if multiple brokers halt
> simultaneously there is a chance that both leader and followers of a
> partition are among the halted brokers, which would result into permanent
> data loss.
> Given that this is a legit case, we suggest to replace it with a graceful
> shutdown to avoid propagating data loss to the entire cluster.
> Details:
> One legit case that this could actually occur is when a troubled broker
> shrinks its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In
> this case the broker has lost some data but the controller cannot still
> elects the others as the leader. If the crashed broker comes back up, the
> controller elects it as the leader, and as a result all other brokers who are
> now following it halt since they have LEOs larger than that of shrunk topics
> in the restarted broker. We actually had a case that bringing up a crashed
> broker simultaneously took down the entire cluster and as explained above
> this could result into data loss.
> The other legit case is when multiple brokers ungracefully shutdown at the
> same time. In this case both of the leader and the followers lose their
> un-flushed data but one of them has HW larger than the other. Controller
> elects the one who comes back up sooner as the leader and if its LEO is less
> than its future follower, the follower will halt (and probably lose more
> data). Simultaneous ungrateful shutdown could happen due to hardware issue
> (e.g., rack power failure), operator errors, or software issue (e.g., the
> case above that is further explained in KAFKA-3410 and KAFKA-3861 and causes
> simultaneous halts in multiple brokers)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)