This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new de2e693 KAFKA-10270: A broker to controller channel manager (#9012)
de2e693 is described below
commit de2e6938c8648f02254a645a8fff9c2fa8364ef1
Author: Boyang Chen <[email protected]>
AuthorDate: Wed Jul 29 11:40:14 2020 -0700
KAFKA-10270: A broker to controller channel manager (#9012)
Add a broker to controller channel manager for use cases such as
redirection and AlterIsr.
Reviewers: David Arthur <[email protected]>, Colin P. McCabe
<[email protected]>, Ismael Juma <[email protected]>
Co-authored-by: Viktor Somogyi <[email protected]>
Co-authored-by: Boyang Chen <[email protected]>
---
.../kafka/clients/ManualMetadataUpdater.java | 2 +-
.../scala/kafka/common/InterBrokerSendThread.scala | 16 +-
.../server/BrokerToControllerChannelManager.scala | 187 ++++++++++++++++++++
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +
.../main/scala/kafka/server/MetadataCache.scala | 3 +-
.../kafka/common/InterBrokerSendThreadTest.scala | 26 +--
.../BrokerToControllerRequestThreadTest.scala | 193 +++++++++++++++++++++
7 files changed, 413 insertions(+), 18 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index c1c1fba..3d51549 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -39,7 +39,7 @@ public class ManualMetadataUpdater implements MetadataUpdater
{
private List<Node> nodes;
public ManualMetadataUpdater() {
- this(new ArrayList<Node>(0));
+ this(new ArrayList<>(0));
}
public ManualMetadataUpdater(List<Node> nodes) {
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 3eff03d..11e1aa8 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -20,7 +20,7 @@ import java.util.{ArrayDeque, ArrayList, Collection,
Collections, HashMap, Itera
import java.util.Map.Entry
import kafka.utils.ShutdownableThread
-import org.apache.kafka.clients.{ClientRequest, ClientResponse, NetworkClient,
RequestCompletionHandler}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, KafkaClient,
RequestCompletionHandler}
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.AuthenticationException
import org.apache.kafka.common.internals.FatalExitError
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
* Class for inter-broker send thread that utilize a non-blocking network
client.
*/
abstract class InterBrokerSendThread(name: String,
- networkClient: NetworkClient,
+ networkClient: KafkaClient,
time: Time,
isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
@@ -57,8 +57,13 @@ abstract class InterBrokerSendThread(name: String,
generateRequests().foreach { request =>
val completionHandler = request.handler
unsentRequests.put(request.destination,
- networkClient.newClientRequest(request.destination.idString,
request.request, now, true,
- requestTimeoutMs, completionHandler))
+ networkClient.newClientRequest(
+ request.destination.idString,
+ request.request,
+ now,
+ true,
+ requestTimeoutMs,
+ completionHandler))
}
try {
@@ -138,7 +143,8 @@ abstract class InterBrokerSendThread(name: String,
def wakeup(): Unit = networkClient.wakeup()
}
-case class RequestAndCompletionHandler(destination: Node, request:
AbstractRequest.Builder[_ <: AbstractRequest],
+case class RequestAndCompletionHandler(destination: Node,
+ request: AbstractRequest.Builder[_ <:
AbstractRequest],
handler: RequestCompletionHandler)
private class UnsentRequests {
diff --git
a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
new file mode 100644
index 0000000..de092cc
--- /dev/null
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache:
kafka.server.MetadataCache,
+ time: Time,
+ metrics: Metrics,
+ config: KafkaConfig,
+ threadNamePrefix: Option[String] =
None) extends Logging {
+ private val requestQueue = new
LinkedBlockingDeque[BrokerToControllerQueueItem]
+ private val logContext = new
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+ private val manualMetadataUpdater = new ManualMetadataUpdater()
+ private val requestThread = newRequestThread
+
+ def start(): Unit = {
+ requestThread.start()
+ }
+
+ def shutdown(): Unit = {
+ requestThread.shutdown()
+ requestThread.awaitShutdown()
+ }
+
+ private[server] def newRequestThread = {
+ val brokerToControllerListenerName =
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+ val brokerToControllerSecurityProtocol =
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+ val networkClient = {
+ val channelBuilder = ChannelBuilders.clientChannelBuilder(
+ brokerToControllerSecurityProtocol,
+ JaasContext.Type.SERVER,
+ config,
+ brokerToControllerListenerName,
+ config.saslMechanismInterBrokerProtocol,
+ time,
+ config.saslInterBrokerHandshakeRequestEnable,
+ logContext
+ )
+ val selector = new Selector(
+ NetworkReceive.UNLIMITED,
+ Selector.NO_IDLE_TIMEOUT_MS,
+ metrics,
+ time,
+ "BrokerToControllerChannel",
+ Map("BrokerId" -> config.brokerId.toString).asJava,
+ false,
+ channelBuilder,
+ logContext
+ )
+ new NetworkClient(
+ selector,
+ manualMetadataUpdater,
+ config.brokerId.toString,
+ 1,
+ 0,
+ 0,
+ Selectable.USE_DEFAULT_BUFFER_SIZE,
+ Selectable.USE_DEFAULT_BUFFER_SIZE,
+ config.requestTimeoutMs,
+ config.connectionSetupTimeoutMs,
+ config.connectionSetupTimeoutMaxMs,
+ ClientDnsLookup.USE_ALL_DNS_IPS,
+ time,
+ false,
+ new ApiVersions,
+ logContext
+ )
+ }
+ val threadName = threadNamePrefix match {
+ case None => s"broker-${config.brokerId}-to-controller-send-thread"
+ case Some(name) =>
s"$name:broker-${config.brokerId}-to-controller-send-thread"
+ }
+
+ new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater,
requestQueue, metadataCache, config,
+ brokerToControllerListenerName, time, threadName)
+ }
+
+ private[server] def sendRequest(request: AbstractRequest.Builder[_ <:
AbstractRequest],
+ callback: RequestCompletionHandler): Unit = {
+ requestQueue.put(BrokerToControllerQueueItem(request, callback))
+ }
+}
+
+case class BrokerToControllerQueueItem(request: AbstractRequest.Builder[_ <:
AbstractRequest],
+ callback: RequestCompletionHandler)
+
+class BrokerToControllerRequestThread(networkClient: KafkaClient,
+ metadataUpdater: ManualMetadataUpdater,
+ requestQueue:
LinkedBlockingDeque[BrokerToControllerQueueItem],
+ metadataCache:
kafka.server.MetadataCache,
+ config: KafkaConfig,
+ listenerName: ListenerName,
+ time: Time,
+ threadName: String)
+ extends InterBrokerSendThread(threadName, networkClient, time,
isInterruptible = false) {
+
+ private var activeController: Option[Node] = None
+
+ override def requestTimeoutMs: Int = config.controllerSocketTimeoutMs
+
+ override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
+ val requestsToSend = new mutable.Queue[RequestAndCompletionHandler]
+ val topRequest = requestQueue.poll()
+ if (topRequest != null) {
+ val request = RequestAndCompletionHandler(
+ activeController.get,
+ topRequest.request,
+ handleResponse(topRequest),
+ )
+ requestsToSend.enqueue(request)
+ }
+ requestsToSend
+ }
+
+ private[server] def handleResponse(request:
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+ if (response.wasDisconnected()) {
+ activeController = None
+ requestQueue.putFirst(request)
+ } else if
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+ // just close the controller connection and wait for metadata cache
update in doWork
+ networkClient.close(activeController.get.idString)
+ activeController = None
+ requestQueue.putFirst(request)
+ } else {
+ request.callback.onComplete(response)
+ }
+ }
+
+ private[server] def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
+
+ override def doWork(): Unit = {
+ if (activeController.isDefined) {
+ super.doWork()
+ } else {
+ debug("Controller isn't cached, looking for local metadata changes")
+ val controllerOpt =
metadataCache.getControllerId.flatMap(metadataCache.getAliveBroker)
+ if (controllerOpt.isDefined) {
+ if (activeController.isEmpty || activeController.exists(_.id !=
controllerOpt.get.id))
+ info(s"Recorded new controller, from now on will use broker
${controllerOpt.get.id}")
+ activeController = Option(controllerOpt.get.node(listenerName))
+
metadataUpdater.setNodes(metadataCache.getAliveBrokers.map(_.node(listenerName)).asJava)
+ } else {
+ // need to backoff to avoid tight loops
+ debug("No controller defined in metadata cache, retrying after
backoff")
+ backoff()
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 86b841f..c3ab250 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -168,6 +168,8 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
var kafkaController: KafkaController = null
+ var brokerToControllerChannelManager: BrokerToControllerChannelManager = null
+
var kafkaScheduler: KafkaScheduler = null
var metadataCache: MetadataCache = null
@@ -314,6 +316,8 @@ class KafkaServer(val config: KafkaConfig, time: Time =
Time.SYSTEM, threadNameP
kafkaController = new KafkaController(config, zkClient, time, metrics,
brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)
kafkaController.startup()
+ brokerToControllerChannelManager = new
BrokerToControllerChannelManager(metadataCache, time, metrics, config,
threadNamePrefix)
+
adminManager = new AdminManager(config, metrics, metadataCache,
zkClient)
/* start group coordinator */
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index bdc1be9..79d84ea 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -18,7 +18,7 @@
package kafka.server
import java.util
-import java.util.{Collections}
+import java.util.Collections
import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.{mutable, Seq, Set}
@@ -38,7 +38,6 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataResponse,
UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
-
/**
* A cache for the state (e.g., current leader) of each partition. This cache
is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same
cache, asynchronously.
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index ae372e0..f5110bf 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -64,10 +64,10 @@ class InterBrokerSendThreadTest {
override def generateRequests() =
List[RequestAndCompletionHandler](handler)
}
- val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
- requestTimeoutMs, handler.handler)
+ val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, handler.handler)
- EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+ EasyMock.expect(networkClient.newClientRequest(
+ EasyMock.eq("1"),
EasyMock.same(handler.request),
EasyMock.anyLong(),
EasyMock.eq(true),
@@ -101,10 +101,10 @@ class InterBrokerSendThreadTest {
override def generateRequests() =
List[RequestAndCompletionHandler](requestAndCompletionHandler)
}
- val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
- requestTimeoutMs, requestAndCompletionHandler.handler)
+ val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true,
requestTimeoutMs, requestAndCompletionHandler.handler)
- EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+ EasyMock.expect(networkClient.newClientRequest(
+ EasyMock.eq("1"),
EasyMock.same(requestAndCompletionHandler.request),
EasyMock.anyLong(),
EasyMock.eq(true),
@@ -145,11 +145,18 @@ class InterBrokerSendThreadTest {
override def generateRequests() =
List[RequestAndCompletionHandler](handler)
}
- val clientRequest = new ClientRequest("dest", request, 0, "1",
time.milliseconds(), true,
- requestTimeoutMs, handler.handler)
+ val clientRequest = new ClientRequest("dest",
+ request,
+ 0,
+ "1",
+ time.milliseconds(),
+ true,
+ requestTimeoutMs,
+ handler.handler)
time.sleep(1500)
- EasyMock.expect(networkClient.newClientRequest(EasyMock.eq("1"),
+ EasyMock.expect(networkClient.newClientRequest(
+ EasyMock.eq("1"),
EasyMock.same(handler.request),
EasyMock.eq(time.milliseconds()),
EasyMock.eq(true),
@@ -180,7 +187,6 @@ class InterBrokerSendThreadTest {
Assert.assertTrue(completionHandler.executedWithDisconnectedResponse)
}
-
private class StubRequestBuilder extends
AbstractRequest.Builder(ApiKeys.END_TXN) {
override def build(version: Short): Nothing = ???
}
diff --git
a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
new file mode 100644
index 0000000..acc1fcf
--- /dev/null
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingDeque, TimeUnit}
+import java.util.Collections
+
+import kafka.cluster.{Broker, EndPoint}
+import kafka.utils.TestUtils
+import org.apache.kafka.test.{TestUtils => ClientsTestUtils}
+import org.apache.kafka.clients.{ManualMetadataUpdater, Metadata, MockClient}
+import org.apache.kafka.common.feature.Features
+import org.apache.kafka.common.feature.Features.emptySupportedFeatures
+import org.apache.kafka.common.utils.SystemTime
+import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class BrokerToControllerRequestThreadTest {
+
+ @Test
+ def testRequestsSent(): Unit = {
+ // just a simple test that tests whether the request from 1 -> 2 is sent
and the response callback is called
+ val time = new SystemTime
+ val config = new KafkaConfig(TestUtils.createBrokerConfig(1,
"localhost:2181"))
+ val controllerId = 2
+
+ val metadata = mock(classOf[Metadata])
+ val mockClient = new MockClient(time, metadata)
+
+ val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+ val metadataCache = mock(classOf[MetadataCache])
+ val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val activeController = new Broker(controllerId,
+ Seq(new EndPoint("host", 1234, listenerName,
SecurityProtocol.PLAINTEXT)), None, emptySupportedFeatures)
+
+ when(metadataCache.getControllerId).thenReturn(Some(controllerId))
+ when(metadataCache.getAliveBrokers).thenReturn(Seq(activeController))
+
when(metadataCache.getAliveBroker(controllerId)).thenReturn(Some(activeController))
+
+ val expectedResponse = ClientsTestUtils.metadataUpdateWith(2,
Collections.singletonMap("a", new Integer(2)))
+ val testRequestThread = new BrokerToControllerRequestThread(mockClient,
new ManualMetadataUpdater(), requestQueue, metadataCache,
+ config, listenerName, time, "")
+ mockClient.prepareResponse(expectedResponse)
+
+ val responseLatch = new CountDownLatch(1)
+ val queueItem = BrokerToControllerQueueItem(
+ new MetadataRequest.Builder(new MetadataRequestData()), response => {
+ assertEquals(expectedResponse, response.responseBody())
+ responseLatch.countDown()
+ })
+ requestQueue.put(queueItem)
+ // initialize to the controller
+ testRequestThread.doWork()
+ // send and process the request
+ testRequestThread.doWork()
+
+ assertTrue(responseLatch.await(10, TimeUnit.SECONDS))
+ }
+
+ @Test
+ def testControllerChanged(): Unit = {
+ // in this test the current broker is 1, and the controller changes from 2
-> 3 then back: 3 -> 2
+ val time = new SystemTime
+ val config = new KafkaConfig(TestUtils.createBrokerConfig(1,
"localhost:2181"))
+ val oldControllerId = 1
+ val newControllerId = 2
+
+ val metadata = mock(classOf[Metadata])
+ val mockClient = new MockClient(time, metadata)
+
+ val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+ val metadataCache = mock(classOf[MetadataCache])
+ val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val oldController = new Broker(oldControllerId,
+ Seq(new EndPoint("host1", 1234, listenerName,
SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+ val oldControllerNode = oldController.node(listenerName)
+ val newController = new Broker(newControllerId,
+ Seq(new EndPoint("host2", 1234, listenerName,
SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+
+ when(metadataCache.getControllerId).thenReturn(Some(oldControllerId),
Some(newControllerId))
+
when(metadataCache.getAliveBroker(oldControllerId)).thenReturn(Some(oldController))
+
when(metadataCache.getAliveBroker(newControllerId)).thenReturn(Some(newController))
+ when(metadataCache.getAliveBrokers).thenReturn(Seq(oldController,
newController))
+
+ val expectedResponse = ClientsTestUtils.metadataUpdateWith(3,
Collections.singletonMap("a", new Integer(2)))
+ val testRequestThread = new BrokerToControllerRequestThread(mockClient,
new ManualMetadataUpdater(),
+ requestQueue, metadataCache, config, listenerName, time, "")
+
+ val responseLatch = new CountDownLatch(1)
+
+ val queueItem = BrokerToControllerQueueItem(
+ new MetadataRequest.Builder(new MetadataRequestData()), response => {
+ assertEquals(expectedResponse, response.responseBody())
+ responseLatch.countDown()
+ })
+ requestQueue.put(queueItem)
+ mockClient.prepareResponse(expectedResponse)
+ // initialize the thread with oldController
+ testRequestThread.doWork()
+ // assert queue correctness
+ assertFalse(requestQueue.isEmpty)
+ assertEquals(1, requestQueue.size())
+ assertEquals(queueItem, requestQueue.peek())
+ // disconnect the node
+ mockClient.setUnreachable(oldControllerNode, time.milliseconds() + 5000)
+ // verify that the client closed the connection to the faulty controller
+ testRequestThread.doWork()
+ assertFalse(requestQueue.isEmpty)
+ assertEquals(1, requestQueue.size())
+ // should connect to the new controller
+ testRequestThread.doWork()
+ // should send the request and process the response
+ testRequestThread.doWork()
+
+ assertTrue(responseLatch.await(10, TimeUnit.SECONDS))
+ }
+
+ @Test
+ def testNotController(): Unit = {
+ val time = new SystemTime
+ val config = new KafkaConfig(TestUtils.createBrokerConfig(1,
"localhost:2181"))
+ val oldControllerId = 1
+ val newControllerId = 2
+
+ val metadata = mock(classOf[Metadata])
+ val mockClient = new MockClient(time, metadata)
+
+ val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]()
+ val metadataCache = mock(classOf[MetadataCache])
+ val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val oldController = new Broker(oldControllerId,
+ Seq(new EndPoint("host1", 1234, listenerName,
SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+ val newController = new Broker(2,
+ Seq(new EndPoint("host2", 1234, listenerName,
SecurityProtocol.PLAINTEXT)), None, Features.emptySupportedFeatures)
+
+ when(metadataCache.getControllerId).thenReturn(Some(oldControllerId),
Some(newControllerId))
+ when(metadataCache.getAliveBrokers).thenReturn(Seq(oldController,
newController))
+
when(metadataCache.getAliveBroker(oldControllerId)).thenReturn(Some(oldController))
+
when(metadataCache.getAliveBroker(newControllerId)).thenReturn(Some(newController))
+
+ val responseWithNotControllerError =
ClientsTestUtils.metadataUpdateWith("cluster1", 2,
+ Collections.singletonMap("a", Errors.NOT_CONTROLLER),
+ Collections.singletonMap("a", new Integer(2)))
+ val expectedResponse = ClientsTestUtils.metadataUpdateWith(3,
Collections.singletonMap("a", new Integer(2)))
+ val testRequestThread = new BrokerToControllerRequestThread(mockClient,
new ManualMetadataUpdater(), requestQueue, metadataCache,
+ config, listenerName, time, "")
+
+ val responseLatch = new CountDownLatch(1)
+ val queueItem = BrokerToControllerQueueItem(
+ new MetadataRequest.Builder(new MetadataRequestData()
+ .setAllowAutoTopicCreation(true)), response => {
+ assertEquals(expectedResponse, response.responseBody())
+ responseLatch.countDown()
+ })
+ requestQueue.put(queueItem)
+ // initialize to the controller
+ testRequestThread.doWork()
+ // send and process the request
+ mockClient.prepareResponse((body: AbstractRequest) => {
+ body.isInstanceOf[MetadataRequest] &&
+ body.asInstanceOf[MetadataRequest].allowAutoTopicCreation()
+ }, responseWithNotControllerError)
+ testRequestThread.doWork()
+ // reinitialize the controller to a different node
+ testRequestThread.doWork()
+ // process the request again
+ mockClient.prepareResponse(expectedResponse)
+ testRequestThread.doWork()
+
+ assertTrue(responseLatch.await(10, TimeUnit.SECONDS))
+ }
+}