This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4d8552 KAFKA-6526: Enable unclean leader election without controller
change (#4920)
b4d8552 is described below
commit b4d8552218c9ab41bc1e0221c4e79417a2662d19
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue May 1 15:27:02 2018 +0100
KAFKA-6526: Enable unclean leader election without controller change (#4920)
Enable dynamic update of default unclean leader election config of brokers.
A new controller event has been added to process unclean leader election when
the config is enabled dynamically.
Reviewers: Dong Lin <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../scala/kafka/controller/ControllerState.scala | 6 ++-
.../scala/kafka/controller/KafkaController.scala | 14 +++++
.../scala/kafka/server/DynamicBrokerConfig.scala | 8 ++-
.../server/DynamicBrokerReconfigurationTest.scala | 62 +++++++++++++++++++++-
4 files changed, 85 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala
b/core/src/main/scala/kafka/controller/ControllerState.scala
index 17af777..d247305 100644
--- a/core/src/main/scala/kafka/controller/ControllerState.scala
+++ b/core/src/main/scala/kafka/controller/ControllerState.scala
@@ -90,7 +90,11 @@ object ControllerState {
def value = 12
}
+ case object UncleanLeaderElectionEnable extends ControllerState {
+ def value = 13
+ }
+
val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange,
TopicChange, TopicDeletion,
PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance,
ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived,
- LogDirChange, ControllerShutdown)
+ LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable)
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index eee625e..bc721e39 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -195,6 +195,10 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
zkClient.updateBrokerInfoInZk(newBrokerInfo)
}
+ private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
+ eventManager.put(UncleanLeaderElectionEnable)
+ }
+
private def state: ControllerState = eventManager.state
/**
@@ -1009,6 +1013,16 @@ class KafkaController(val config: KafkaConfig, zkClient:
KafkaZkClient, time: Ti
}
}
+ case object UncleanLeaderElectionEnable extends ControllerEvent {
+
+ def state = ControllerState.UncleanLeaderElectionEnable
+
+ override def process(): Unit = {
+ if (!isActive) return
+ partitionStateMachine.triggerOnlinePartitionStateChange()
+ }
+ }
+
case class ControlledShutdown(id: Int, controlledShutdownCallback:
Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
def state = ControllerState.ControlledShutdown
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index be0ed6b..004b531 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -156,7 +156,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
- addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
+ addReconfigurable(new DynamicLogConfig(kafkaServer.logManager,
kafkaServer))
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId,
kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId,
kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
@@ -501,7 +501,7 @@ object DynamicLogConfig {
val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet --
ExcludedConfigs
val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case
(k, v) => (v, k) }
}
-class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with
Logging {
+class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends
Reconfigurable with Logging {
override def configure(configs: util.Map[String, _]): Unit = {}
@@ -517,6 +517,7 @@ class DynamicLogConfig(logManager: LogManager) extends
Reconfigurable with Loggi
override def reconfigure(configs: util.Map[String, _]): Unit = {
val currentLogConfig = logManager.currentDefaultConfig
+ val origUncleanLeaderElectionEnable =
logManager.currentDefaultConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String,
Object](currentLogConfig.originals)
configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach
{ case (k, v) =>
if (v != null) {
@@ -536,6 +537,9 @@ class DynamicLogConfig(logManager: LogManager) extends
Reconfigurable with Loggi
val logConfig = LogConfig(props.asJava)
log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
}
+ if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable &&
!origUncleanLeaderElectionEnable) {
+ server.kafkaController.enableDefaultUncleanLeaderElection()
+ }
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 3c6c039..8b70875 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -43,7 +43,7 @@ import
org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord,
ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.{ClusterResource, ClusterResourceListener,
Reconfigurable, TopicPartition}
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener,
Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.SslConfigs._
import org.apache.kafka.common.config.types.Password
@@ -430,6 +430,62 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
@Test
+ def testUncleanLeaderElectionEnable(): Unit = {
+ val topic = "testtopic2"
+ TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
+ val producer = ProducerBuilder().maxRetries(1000).acks(1).build()
+ val consumer =
ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
+ verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+ consumer.commitSync()
+
+ def partitionInfo: TopicPartitionInfo =
+
adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0)
+
+ val partitionInfo0 = partitionInfo
+ assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader)
+ val leaderBroker = servers.find(_.config.brokerId ==
partitionInfo0.replicas.get(0).id).get
+ val followerBroker = servers.find(_.config.brokerId ==
partitionInfo0.replicas.get(1).id).get
+
+ // Stop follower
+ followerBroker.shutdown()
+ followerBroker.awaitShutdown()
+
+ // Produce and consume some messages when the only follower is down, this
should succeed since MinIsr is 1
+ verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+ consumer.commitSync()
+
+ // Shutdown leader and startup follower
+ leaderBroker.shutdown()
+ leaderBroker.awaitShutdown()
+ followerBroker.startup()
+ val controller = servers.find(_.config.brokerId ==
TestUtils.waitUntilControllerElected(zkClient)).get
+
+ // Verify that new leader is not elected with unclean leader disabled
since there are no ISRs
+ TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean
leader elected")
+
+ // Enable unclean leader election
+ val newProps = new Properties
+ newProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+ TestUtils.alterConfigs(servers, adminClients.head, newProps,
perBrokerConfig = false).all.get
+ waitForConfigOnServer(controller,
KafkaConfig.UncleanLeaderElectionEnableProp, "true")
+
+ // Verify that the old follower with missing records is elected as the new
leader
+ val (newLeader, elected) =
TestUtils.computeUntilTrue(partitionInfo.leader)(leader => leader != null)
+ assertTrue("Unclean leader not elected", elected)
+ assertEquals(followerBroker.config.brokerId, newLeader.id)
+
+ // New leader doesn't have the last 10 records committed on the old leader
that have already been consumed.
+ // With unclean leader election enabled, we should be able to produce to
the new leader. The first 10 records
+ // produced will not be consumed since they have offsets less than the
consumer's committed offset.
+ // Next 10 records produced should be consumed.
+ (1 to 10).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
+ .map(producer.send)
+ .map(_.get(10, TimeUnit.SECONDS))
+ verifyProduceConsume(producer, consumer, numRecords = 10, topic)
+ consumer.commitSync()
+ }
+
+ @Test
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "kafka-request-handler-"
val networkThreadPrefix = "kafka-network-thread-"
@@ -1272,12 +1328,14 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
private case class ProducerBuilder() extends
ClientBuilder[KafkaProducer[String, String]] {
private var _retries = 0
+ private var _acks = -1
def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this
}
+ def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
override def build(): KafkaProducer[String, String] = {
val producer = TestUtils.createNewProducer(bootstrapServers,
- acks = -1,
+ acks = _acks,
retries = _retries,
securityProtocol = _securityProtocol,
trustStoreFile = Some(trustStoreFile1),
--
To stop receiving notification emails like this one, please contact
[email protected].