[ https://issues.apache.org/jira/browse/KAFKA-6573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371644#comment-16371644 ]
ASF GitHub Bot commented on KAFKA-6573: --------------------------------------- hachikuji closed pull request #4603: KAFKA-6573: Update brokerInfo in KafkaController on listener update URL: https://github.com/apache/kafka/pull/4603 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f36fc798ad6..a8707ad887d 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -190,6 +190,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti eventManager.put(controlledShutdownEvent) } + private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = { + this.brokerInfo = newBrokerInfo + zkClient.updateBrokerInfoInZk(newBrokerInfo) + } + private def state: ControllerState = eventManager.state /** diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index ce4b9e75b7c..3236af01bbb 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -751,7 +751,7 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded) - server.zkClient.updateBrokerInfoInZk(server.createBrokerInfo) + server.kafkaController.updateBrokerInfo(server.createBrokerInfo) } private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] = diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index afc8202b88a..6545fde30e9 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token import org.apache.kafka.common.utils.Time import org.apache.zookeeper.KeeperException.{Code, NodeExistsException} import org.apache.zookeeper.data.{ACL, Stat} -import org.apache.zookeeper.{CreateMode, KeeperException} +import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper} import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, mutable} @@ -61,6 +61,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean import KafkaZkClient._ + // Only for testing + private[kafka] def currentZooKeeper: ZooKeeper = zooKeeperClient.currentZooKeeper + /** * Create a sequential persistent path. That is, the znode will not be automatically deleted upon client's disconnect * and a monotonically increasing number will be appended to its name. diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index 3934fd0ad5d..efbd6e898a8 100644 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -317,7 +317,7 @@ class ZooKeeperClient(connectString: String, } // Only for testing - private[zookeeper] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) { + private[kafka] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) { zooKeeper } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index ee3876231b7..f0bd61a0ccc 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -599,6 +599,26 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val invalidHost = "192.168.0.1" alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost) + def validateEndpointsInZooKeeper(server: KafkaServer, endpointMatcher: String => Boolean): Unit = { + val brokerInfo = zkClient.getBroker(server.config.brokerId) + assertTrue("Broker not registered", brokerInfo.nonEmpty) + val endpoints = brokerInfo.get.endPoints.toString + assertTrue(s"Endpoint update not saved $endpoints", endpointMatcher(endpoints)) + } + + // Verify that endpoints have been updated in ZK for all brokers + servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost))) + + // Trigger session expiry and ensure that controller registers new advertised listener after expiry + val controllerEpoch = zkClient.getControllerEpoch + val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller"))) + val controllerZkClient = controllerServer.zkClient + val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper) + sessionExpiringClient.close() + TestUtils.waitUntilTrue(() => zkClient.getControllerEpoch != controllerEpoch, + "Controller not re-elected after ZK session expiry") + TestUtils.retry(10000)(validateEndpointsInZooKeeper(controllerServer, endpoints => endpoints.contains(invalidHost))) + // Verify that producer connections fail since advertised listener is invalid val bootstrap = bootstrapServers.replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed val producer1 = createProducer(trustStoreFile1, retries = 0, bootstrap = bootstrap) @@ -606,6 +626,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val sendFuture = verifyConnectionFailure(producer1) alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost") + servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost))) // Verify that produce/consume work now val producer = createProducer(trustStoreFile1, retries = 0) diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 9e7258315d8..a1222977751 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.consumer.internals.AbstractCoordinator import kafka.controller.ControllerEventManager import org.apache.kafka.common.utils.Time +import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper} @Category(Array(classOf[IntegrationTest])) abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { @@ -67,6 +68,18 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { CoreUtils.swallow(zookeeper.shutdown(), this) Configuration.setConfiguration(null) } + + // Trigger session expiry by reusing the session id in another client + def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = { + val dummyWatcher = new Watcher { + override def process(event: WatchedEvent): Unit = {} + } + val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher, + zooKeeper.getSessionId, + zooKeeper.getSessionPasswd) + assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works + anotherZkClient + } } object ZooKeeperTestHarness { diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index f1c09d7308d..2e0651c9991 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Time import org.apache.zookeeper.KeeperException.{Code, NoNodeException} import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState} import org.apache.zookeeper.ZooKeeper.States -import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooDefs, ZooKeeper} -import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue} +import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs} +import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertTrue} import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ @@ -456,14 +456,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { requestThread.start() sendCompleteSemaphore.acquire() // Wait for request thread to start processing requests - // Trigger session expiry by reusing the session id in another client - val dummyWatcher = new Watcher { - override def process(event: WatchedEvent): Unit = {} - } - val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher, - zooKeeperClient.currentZooKeeper.getSessionId, - zooKeeperClient.currentZooKeeper.getSessionPasswd) - assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works + val anotherZkClient = createZooKeeperClientToTriggerSessionExpiry(zooKeeperClient.currentZooKeeper) sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail anotherZkClient.close() sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaController.brokerInfo not updated on dynamic update > -------------------------------------------------------- > > Key: KAFKA-6573 > URL: https://issues.apache.org/jira/browse/KAFKA-6573 > Project: Kafka > Issue Type: Bug > Components: controller > Affects Versions: 1.1.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 1.1.0 > > > KafkaController.brokerInfo is cached in-memory and used to re-register the > broker in ZooKeeper if ZK session expires. It should be kept up-to-date if > listeners are dynamically updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)