[ https://issues.apache.org/jira/browse/KAFKA-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420251#comment-15420251 ]
Alexey Ozeritskiy commented on KAFKA-3924: ------------------------------------------ IMHO the simplest way to solve the problem is to execute System.exit asyncroniously: {code} diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index ef602e4..ed00a73 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -175,10 +175,13 @@ class ReplicaFetcherThread(name: String, if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. - fatal("Exiting because log truncation is not allowed for partition %s,".format(topicAndPartition) + + val msg = "Exiting because log truncation is not allowed for partition %s,".format(topicAndPartition) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" - .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) - System.exit(1) + .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset) + fatal(msg) + + replicaMgr.scheduler.schedule("exit", () => System.exit(1)) + throw new Exception(msg) } warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2b97783..6e6539b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -105,7 +105,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, jTime: JTime, val zkUtils: ZkUtils, - scheduler: Scheduler, + val scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { {code} > Data loss due to halting when LEO is larger than leader's LEO > ------------------------------------------------------------- > > Key: KAFKA-3924 > URL: https://issues.apache.org/jira/browse/KAFKA-3924 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.10.0.0 > Reporter: Maysam Yabandeh > Fix For: 0.10.0.1 > > Attachments: deadlock-stack > > > Currently the follower broker panics when its LEO is larger than its leader's > LEO, and assuming that this is an impossible state to reach, halts the > process to prevent any further damage. > {code} > if (leaderEndOffset < replica.logEndOffset.messageOffset) { > // Prior to truncating the follower's log, ensure that doing so is not > disallowed by the configuration for unclean leader election. > // This situation could only happen if the unclean election > configuration for a topic changes while a replica is down. Otherwise, > // we should never encounter this situation since a non-ISR leader > cannot be elected if disallowed by the broker configuration. > if (!LogConfig.fromProps(brokerConfig.originals, > AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, > ConfigType.Topic, > topicAndPartition.topic)).uncleanLeaderElectionEnable) { > // Log a fatal error and shutdown the broker to ensure that data loss > does not unexpectedly occur. > fatal("...") > Runtime.getRuntime.halt(1) > } > {code} > Firstly this assumption is invalid and there are legitimate cases (examples > below) that this case could actually occur. Secondly halt results into the > broker losing its un-flushed data, and if multiple brokers halt > simultaneously there is a chance that both leader and followers of a > partition are among the halted brokers, which would result into permanent > data loss. > Given that this is a legit case, we suggest to replace it with a graceful > shutdown to avoid propagating data loss to the entire cluster. > Details: > One legit case that this could actually occur is when a troubled broker > shrinks its partitions right before crashing (KAFKA-3410 and KAFKA-3861). In > this case the broker has lost some data but the controller cannot still > elects the others as the leader. If the crashed broker comes back up, the > controller elects it as the leader, and as a result all other brokers who are > now following it halt since they have LEOs larger than that of shrunk topics > in the restarted broker. We actually had a case that bringing up a crashed > broker simultaneously took down the entire cluster and as explained above > this could result into data loss. > The other legit case is when multiple brokers ungracefully shutdown at the > same time. In this case both of the leader and the followers lose their > un-flushed data but one of them has HW larger than the other. Controller > elects the one who comes back up sooner as the leader and if its LEO is less > than its future follower, the follower will halt (and probably lose more > data). Simultaneous ungrateful shutdown could happen due to hardware issue > (e.g., rack power failure), operator errors, or software issue (e.g., the > case above that is further explained in KAFKA-3410 and KAFKA-3861 and causes > simultaneous halts in multiple brokers) -- This message was sent by Atlassian JIRA (v6.3.4#6332)