This is an automated email from the ASF dual-hosted git repository.

payang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 26543faa99b MINOR: Migrate NodeToControllerRequestThreadTest from 
Scala to Java (#21693)
26543faa99b is described below

commit 26543faa99bab63ed8e39ad2d194713cfbed372b
Author: Lan Ding <[email protected]>
AuthorDate: Wed Mar 11 19:23:34 2026 +0800

    MINOR: Migrate NodeToControllerRequestThreadTest from Scala to Java (#21693)
    
    Move `NodeToControllerRequestThreadTest` from core  to the server module
    and rewrite in Java.
    
    Reviewers: PoAn Yang <[email protected]>
---
 .../server/NodeToControllerRequestThreadTest.scala | 481 ------------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  22 +-
 .../server/NodeToControllerRequestThreadTest.java  | 549 +++++++++++++++++++++
 3 files changed, 550 insertions(+), 502 deletions(-)

diff --git 
a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala 
b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
deleted file mode 100644
index a90f9034d02..00000000000
--- a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicReference
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
-import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, 
Metadata, MockClient, NodeApiVersions}
-import org.apache.kafka.common.Node
-import org.apache.kafka.common.message.{EnvelopeResponseData, 
MetadataRequestData}
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, 
EnvelopeResponse, MetadataRequest, RequestTestUtils}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.server.common.ControllerRequestCompletionHandler
-import org.apache.kafka.server.ControllerInformation
-import org.apache.kafka.server.NodeToControllerRequestThread
-import org.apache.kafka.server.NodeToControllerQueueItem
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.mockito.Mockito._
-
-import java.util.Optional
-import java.util.function.Supplier
-import scala.jdk.OptionConverters.RichOption
-
-
-class NodeToControllerRequestThreadTest {
-
-  private def controllerInfo(node: Option[Node]): ControllerInformation = {
-    new ControllerInformation(node.toJava, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "")
-  }
-
-  private def emptyControllerInfo: ControllerInformation = {
-    controllerInfo(None)
-  }
-
-  @Test
-  def testRetryTimeoutWhileControllerNotAvailable(): Unit = {
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-
-    when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
-
-    val retryTimeoutMs = 30000
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient,  new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs)
-    testRequestThread.setStarted(true)
-
-    val completionHandler = new TestControllerRequestCompletionHandler(None)
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()),
-      completionHandler
-    )
-
-    testRequestThread.enqueue(queueItem)
-    testRequestThread.doWork()
-    assertEquals(1, testRequestThread.queueSize)
-
-    time.sleep(retryTimeoutMs)
-    testRequestThread.doWork()
-    assertEquals(0, testRequestThread.queueSize)
-    assertTrue(completionHandler.timedOut.get)
-  }
-
-  @Test
-  def testRequestsSent(): Unit = {
-    // just a simple test that tests whether the request from 1 -> 2 is sent 
and the response callback is called
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val controllerId = 2
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    val activeController = new Node(controllerId, "host", 1234)
-
-    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
-
-    val expectedResponse = RequestTestUtils.metadataUpdateWith(2, 
java.util.Map.of("a", 2))
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient,  new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", Long.MaxValue)
-    testRequestThread.setStarted(true)
-    mockClient.prepareResponse(expectedResponse)
-
-    val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()),
-      completionHandler
-    )
-
-    testRequestThread.enqueue(queueItem)
-    assertEquals(1, testRequestThread.queueSize)
-
-    // initialize to the controller
-    testRequestThread.doWork()
-    // send and process the request
-    testRequestThread.doWork()
-
-    assertEquals(0, testRequestThread.queueSize)
-    assertTrue(completionHandler.completed.get())
-  }
-
-  @Test
-  def testControllerChanged(): Unit = {
-    // in this test the current broker is 1, and the controller changes from 2 
-> 3 then back: 3 -> 2
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val oldControllerId = 1
-    val newControllerId = 2
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    val oldController = new Node(oldControllerId, "host1", 1234)
-    val newController = new Node(newControllerId, "host2", 1234)
-
-    when(controllerNodeProvider.get()).thenReturn(
-      controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
-
-    val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
java.util.Map.of("a", 2))
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", Long.MaxValue)
-    testRequestThread.setStarted(true)
-
-    val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()),
-      completionHandler,
-    )
-
-    testRequestThread.enqueue(queueItem)
-    mockClient.prepareResponse(expectedResponse)
-    // initialize the thread with oldController
-    testRequestThread.doWork()
-    assertFalse(completionHandler.completed.get())
-
-    // disconnect the node
-    mockClient.setUnreachable(oldController, time.milliseconds() + 5000)
-    // verify that the client closed the connection to the faulty controller
-    testRequestThread.doWork()
-    // should connect to the new controller
-    testRequestThread.doWork()
-    // should send the request and process the response
-    testRequestThread.doWork()
-
-    assertTrue(completionHandler.completed.get())
-  }
-
-  @Test
-  def testNotController(): Unit = {
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val oldControllerId = 1
-    val newControllerId = 2
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    val port = 1234
-    val oldController = new Node(oldControllerId, "host1", port)
-    val newController = new Node(newControllerId, "host2", port)
-
-    when(controllerNodeProvider.get()).thenReturn(
-      controllerInfo(Some(oldController)), controllerInfo(Some(newController)))
-
-    val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      java.util.Map.of("a", Errors.NOT_CONTROLLER),
-      java.util.Map.of("a", 2))
-    val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
java.util.Map.of("a", 2))
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", Long.MaxValue)
-    testRequestThread.setStarted(true)
-
-    val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()
-        .setAllowAutoTopicCreation(true)),
-      completionHandler
-    )
-    testRequestThread.enqueue(queueItem)
-    // initialize to the controller
-    testRequestThread.doWork()
-
-    val oldBrokerNode = new Node(oldControllerId, "host1", port)
-    assertEquals(Some(oldBrokerNode).toJava, 
testRequestThread.activeControllerAddress())
-
-    // send and process the request
-    mockClient.prepareResponse((body: AbstractRequest) => {
-      body.isInstanceOf[MetadataRequest] &&
-      body.asInstanceOf[MetadataRequest].allowAutoTopicCreation()
-    }, responseWithNotControllerError)
-    testRequestThread.doWork()
-    assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
-    // reinitialize the controller to a different node
-    testRequestThread.doWork()
-    // process the request again
-    mockClient.prepareResponse(expectedResponse)
-    testRequestThread.doWork()
-
-    val newControllerNode = new Node(newControllerId, "host2", port)
-    assertEquals(Some(newControllerNode).toJava, 
testRequestThread.activeControllerAddress())
-
-    assertTrue(completionHandler.completed.get())
-  }
-
-  @Test
-  def testEnvelopeResponseWithNotControllerError(): Unit = {
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val oldControllerId = 1
-    val newControllerId = 2
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-    // enable envelope API
-    mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id, 
0.toShort, 0.toShort))
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    val port = 1234
-    val oldController = new Node(oldControllerId, "host1", port)
-    val newController = new Node(newControllerId, "host2", port)
-
-    when(controllerNodeProvider.get()).thenReturn(
-      controllerInfo(Some(oldController)),
-      controllerInfo(Some(newController))
-    )
-
-    // create an envelopeResponse with NOT_CONTROLLER error
-    val envelopeResponseWithNotControllerError = new EnvelopeResponse(
-      new EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()))
-
-    // response for retry request after receiving NOT_CONTROLLER error
-    val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
java.util.Map.of("a", 2))
-
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient,  new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", Long.MaxValue)
-    testRequestThread.setStarted(true)
-
-    val completionHandler = new 
TestControllerRequestCompletionHandler(Some(expectedResponse))
-    val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"principal", true)
-    val kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
-
-    // build an EnvelopeRequest by dummy data
-    val envelopeRequestBuilder = new 
EnvelopeRequest.Builder(ByteBuffer.allocate(0),
-      kafkaPrincipalBuilder.serialize(kafkaPrincipal), 
"client-address".getBytes)
-
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      envelopeRequestBuilder,
-      completionHandler
-    )
-
-    testRequestThread.enqueue(queueItem)
-    // initialize to the controller
-    testRequestThread.doWork()
-
-    val oldBrokerNode = new Node(oldControllerId, "host1", port)
-    assertEquals(Some(oldBrokerNode).toJava, 
testRequestThread.activeControllerAddress())
-
-    // send and process the envelope request
-    mockClient.prepareResponse((body: AbstractRequest) => {
-      body.isInstanceOf[EnvelopeRequest]
-    }, envelopeResponseWithNotControllerError)
-    testRequestThread.doWork()
-    // expect to reset the activeControllerAddress after finding the 
NOT_CONTROLLER error
-    assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
-    // reinitialize the controller to a different node
-    testRequestThread.doWork()
-    // process the request again
-    mockClient.prepareResponse(expectedResponse)
-    testRequestThread.doWork()
-
-    val newControllerNode = new Node(newControllerId, "host2", port)
-    assertEquals(Some(newControllerNode).toJava, 
testRequestThread.activeControllerAddress())
-
-    assertTrue(completionHandler.completed.get())
-  }
-
-  @Test
-  def testRetryTimeout(): Unit = {
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val controllerId = 1
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    val controller = new Node(controllerId, "host1", 1234)
-
-    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(controller)))
-
-    val retryTimeoutMs = 30000
-    val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      java.util.Map.of("a", Errors.NOT_CONTROLLER),
-      java.util.Map.of("a", 2))
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient, new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", retryTimeoutMs)
-    testRequestThread.setStarted(true)
-
-    val completionHandler = new TestControllerRequestCompletionHandler()
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()
-        .setAllowAutoTopicCreation(true)),
-      completionHandler
-    )
-
-    testRequestThread.enqueue(queueItem)
-
-    // initialize to the controller
-    testRequestThread.doWork()
-
-    time.sleep(retryTimeoutMs)
-
-    // send and process the request
-    mockClient.prepareResponse((body: AbstractRequest) => {
-      body.isInstanceOf[MetadataRequest] &&
-        body.asInstanceOf[MetadataRequest].allowAutoTopicCreation()
-    }, responseWithNotControllerError)
-
-    testRequestThread.doWork()
-
-    assertTrue(completionHandler.timedOut.get())
-  }
-
-  @Test
-  def testUnsupportedVersionHandling(): Unit = {
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val controllerId = 2
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    val activeController = new Node(controllerId, "host", 1234)
-
-    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
-
-    val callbackResponse = new AtomicReference[ClientResponse]()
-    val completionHandler = new ControllerRequestCompletionHandler {
-      override def onTimeout(): Unit = fail("Unexpected timeout exception")
-      override def onComplete(response: ClientResponse): Unit = 
callbackResponse.set(response)
-    }
-
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()),
-      completionHandler
-    )
-
-    mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == 
ApiKeys.METADATA)
-
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient,   new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", Long.MaxValue)
-    testRequestThread.setStarted(true)
-
-    testRequestThread.enqueue(queueItem)
-    pollUntil(testRequestThread, () => callbackResponse.get != null)
-    assertNotNull(callbackResponse.get.versionMismatch)
-  }
-
-  @Test
-  def testAuthenticationExceptionHandling(): Unit = {
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-    val controllerId = 2
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    val activeController = new Node(controllerId, "host", 1234)
-
-    
when(controllerNodeProvider.get()).thenReturn(controllerInfo(Some(activeController)))
-
-    val callbackResponse = new AtomicReference[ClientResponse]()
-    val completionHandler = new ControllerRequestCompletionHandler {
-      override def onTimeout(): Unit = fail("Unexpected timeout exception")
-      override def onComplete(response: ClientResponse): Unit = 
callbackResponse.set(response)
-    }
-
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()),
-      completionHandler
-    )
-
-    mockClient.createPendingAuthenticationError(activeController, 50)
-
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient,  new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", Long.MaxValue)
-    testRequestThread.setStarted(true)
-
-    testRequestThread.enqueue(queueItem)
-    pollUntil(testRequestThread, () => callbackResponse.get != null)
-    assertNotNull(callbackResponse.get.authenticationException)
-    assertEquals(Optional.empty(), testRequestThread.activeControllerAddress())
-  }
-
-  @Test
-  def testThreadNotStarted(): Unit = {
-    // Make sure we throw if we enqueue anything while the thread is not 
running
-    val time = new MockTime()
-    val config = new KafkaConfig(TestUtils.createBrokerConfig(1))
-
-    val metadata = mock(classOf[Metadata])
-    val mockClient = new MockClient(time, metadata)
-
-    val controllerNodeProvider = mock(classOf[Supplier[ControllerInformation]])
-    when(controllerNodeProvider.get()).thenReturn(emptyControllerInfo)
-
-    val testRequestThread = new NodeToControllerRequestThread(
-      mockClient,   new ManualMetadataUpdater(),
-      controllerNodeProvider, config, time, "", Long.MaxValue)
-
-    val completionHandler = new TestControllerRequestCompletionHandler(None)
-    val queueItem = new NodeToControllerQueueItem(
-      time.milliseconds(),
-      new MetadataRequest.Builder(new MetadataRequestData()),
-      completionHandler
-    )
-
-    assertThrows(classOf[IllegalStateException], () => 
testRequestThread.enqueue(queueItem))
-    assertEquals(0, testRequestThread.queueSize)
-  }
-
-  private def pollUntil(
-    requestThread: NodeToControllerRequestThread,
-    condition: () => Boolean,
-    maxRetries: Int = 10
-  ): Unit = {
-    var tries = 0
-    do {
-      requestThread.doWork()
-      tries += 1
-    } while (!condition.apply() && tries < maxRetries)
-
-    if (!condition.apply()) {
-      fail(s"Condition failed to be met after polling $tries times")
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1c8bcb488ef..2a17972e2c1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common._
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBindingFilter}
 import org.apache.kafka.common.compress.Compression
@@ -53,7 +52,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer => JAuthorizer}
-import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
TopicIdPartition}
+import org.apache.kafka.server.common.TopicIdPartition
 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.util.MockTime
@@ -1520,23 +1519,4 @@ object TestUtils extends Logging {
     envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
     envelopRequest
   }
-
-  class TestControllerRequestCompletionHandler(expectedResponse: 
Option[AbstractResponse] = None)
-    extends ControllerRequestCompletionHandler {
-    var actualResponse: Option[ClientResponse] = Option.empty
-    val completed: AtomicBoolean = new AtomicBoolean(false)
-    val timedOut: AtomicBoolean = new AtomicBoolean(false)
-
-    override def onComplete(response: ClientResponse): Unit = {
-      actualResponse = Some(response)
-      expectedResponse.foreach { expected =>
-        assertEquals(expected, response.responseBody())
-      }
-      completed.set(true)
-    }
-
-    override def onTimeout(): Unit = {
-      timedOut.set(true)
-    }
-  }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
 
b/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
new file mode 100644
index 00000000000..5341db3c12d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/NodeToControllerRequestThreadTest.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.message.EnvelopeResponseData;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.config.ReplicationConfigs;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+
+class NodeToControllerRequestThreadTest {
+
+    private static AbstractConfig createConfig() {
+        return new AbstractConfig(ReplicationConfigs.CONFIG_DEF, Map.of());
+    }
+
+    private static ControllerInformation controllerInfo(Optional<Node> node) {
+        return new ControllerInformation(node, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "");
+    }
+
+    private static ControllerInformation emptyControllerInfo() {
+        return controllerInfo(Optional.empty());
+    }
+
+    /**
+     * Creates a supplier that returns {@code first} on the first call,
+     * and {@code second} on all subsequent calls, matching the behavior
+     * of Mockito's {@code thenReturn(first, second)}.
+     *
+     * <p>This avoids mocking {@code Supplier<ControllerInformation>} which 
would
+     * require {@code @SuppressWarnings("unchecked")} due to generic type 
erasure.
+     */
+    private static Supplier<ControllerInformation> sequentialProvider(
+            ControllerInformation first, ControllerInformation second) {
+        AtomicReference<ControllerInformation> ref = new 
AtomicReference<>(first);
+        return () -> ref.getAndSet(second);
+    }
+
+    @Test
+    void testRetryTimeoutWhileControllerNotAvailable() {
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        Supplier<ControllerInformation> controllerNodeProvider = 
NodeToControllerRequestThreadTest::emptyControllerInfo;
+
+        long retryTimeoutMs = 30000;
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", retryTimeoutMs);
+        testRequestThread.setStarted(true);
+
+        TestControllerRequestCompletionHandler completionHandler =
+            new TestControllerRequestCompletionHandler(null);
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()),
+            completionHandler
+        );
+
+        testRequestThread.enqueue(queueItem);
+        testRequestThread.doWork();
+        assertEquals(1, testRequestThread.queueSize());
+
+        time.sleep(retryTimeoutMs);
+        testRequestThread.doWork();
+        assertEquals(0, testRequestThread.queueSize());
+        assertTrue(completionHandler.timedOut.get());
+    }
+
+    @Test
+    void testRequestsSent() {
+        // just a simple test that tests whether the request from 1 -> 2 is 
sent and the response callback is called
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        int controllerId = 2;
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        Node activeController = new Node(controllerId, "host", 1234);
+        Supplier<ControllerInformation> controllerNodeProvider =
+            () -> controllerInfo(Optional.of(activeController));
+
+        MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(2, Map.of("a", 2));
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+        testRequestThread.setStarted(true);
+        mockClient.prepareResponse(expectedResponse);
+
+        TestControllerRequestCompletionHandler completionHandler =
+            new TestControllerRequestCompletionHandler(expectedResponse);
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()),
+            completionHandler
+        );
+
+        testRequestThread.enqueue(queueItem);
+        assertEquals(1, testRequestThread.queueSize());
+
+        // initialize to the controller
+        testRequestThread.doWork();
+        // send and process the request
+        testRequestThread.doWork();
+
+        assertEquals(0, testRequestThread.queueSize());
+        assertTrue(completionHandler.completed.get());
+    }
+
+    @Test
+    void testControllerChanged() {
+        // in this test the controller changes from node 1 -> node 2
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        int oldControllerId = 1;
+        int newControllerId = 2;
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        Node oldController = new Node(oldControllerId, "host1", 1234);
+        Node newController = new Node(newControllerId, "host2", 1234);
+        Supplier<ControllerInformation> controllerNodeProvider = 
sequentialProvider(
+            controllerInfo(Optional.of(oldController)),
+            controllerInfo(Optional.of(newController)));
+
+        MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+        testRequestThread.setStarted(true);
+
+        TestControllerRequestCompletionHandler completionHandler =
+            new TestControllerRequestCompletionHandler(expectedResponse);
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()),
+            completionHandler
+        );
+
+        testRequestThread.enqueue(queueItem);
+        mockClient.prepareResponse(expectedResponse);
+        // initialize the thread with oldController
+        testRequestThread.doWork();
+        assertFalse(completionHandler.completed.get());
+
+        // disconnect the node
+        mockClient.setUnreachable(oldController, time.milliseconds() + 5000);
+        // verify that the client closed the connection to the faulty 
controller
+        testRequestThread.doWork();
+        // should connect to the new controller
+        testRequestThread.doWork();
+        // should send the request and process the response
+        testRequestThread.doWork();
+
+        assertTrue(completionHandler.completed.get());
+    }
+
+    @Test
+    void testNotController() {
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        int oldControllerId = 1;
+        int newControllerId = 2;
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        int port = 1234;
+        Node oldController = new Node(oldControllerId, "host1", port);
+        Node newController = new Node(newControllerId, "host2", port);
+        Supplier<ControllerInformation> controllerNodeProvider = 
sequentialProvider(
+            controllerInfo(Optional.of(oldController)),
+            controllerInfo(Optional.of(newController)));
+
+        MetadataResponse responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
+            Map.of("a", Errors.NOT_CONTROLLER),
+            Map.of("a", 2));
+        MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+        testRequestThread.setStarted(true);
+
+        TestControllerRequestCompletionHandler completionHandler =
+            new TestControllerRequestCompletionHandler(expectedResponse);
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()
+                .setAllowAutoTopicCreation(true)),
+            completionHandler
+        );
+        testRequestThread.enqueue(queueItem);
+        // initialize to the controller
+        testRequestThread.doWork();
+
+        Node oldBrokerNode = new Node(oldControllerId, "host1", port);
+        assertEquals(Optional.of(oldBrokerNode), 
testRequestThread.activeControllerAddress());
+
+        // send and process the request
+        mockClient.prepareResponse(
+            body -> body instanceof MetadataRequest && ((MetadataRequest) 
body).allowAutoTopicCreation(),
+            responseWithNotControllerError);
+        testRequestThread.doWork();
+        assertEquals(Optional.empty(), 
testRequestThread.activeControllerAddress());
+        // reinitialize the controller to a different node
+        testRequestThread.doWork();
+        // process the request again
+        mockClient.prepareResponse(expectedResponse);
+        testRequestThread.doWork();
+
+        Node newControllerNode = new Node(newControllerId, "host2", port);
+        assertEquals(Optional.of(newControllerNode), 
testRequestThread.activeControllerAddress());
+
+        assertTrue(completionHandler.completed.get());
+    }
+
+    @Test
+    void testEnvelopeResponseWithNotControllerError() {
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        int oldControllerId = 1;
+        int newControllerId = 2;
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+        // enable envelope API
+        
mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id, 
(short) 0, (short) 0));
+
+        int port = 1234;
+        Node oldController = new Node(oldControllerId, "host1", port);
+        Node newController = new Node(newControllerId, "host2", port);
+        Supplier<ControllerInformation> controllerNodeProvider = 
sequentialProvider(
+            controllerInfo(Optional.of(oldController)),
+            controllerInfo(Optional.of(newController)));
+
+        // create an envelopeResponse with NOT_CONTROLLER error
+        EnvelopeResponse envelopeResponseWithNotControllerError = new 
EnvelopeResponse(
+            new 
EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()));
+
+        // response for retry request after receiving NOT_CONTROLLER error
+        MetadataResponse expectedResponse = 
RequestTestUtils.metadataUpdateWith(3, Map.of("a", 2));
+
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+        testRequestThread.setStarted(true);
+
+        TestControllerRequestCompletionHandler completionHandler =
+            new TestControllerRequestCompletionHandler(expectedResponse);
+        KafkaPrincipal kafkaPrincipal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "principal", true);
+        DefaultKafkaPrincipalBuilder kafkaPrincipalBuilder = new 
DefaultKafkaPrincipalBuilder(null, null);
+
+        // build an EnvelopeRequest by dummy data
+        EnvelopeRequest.Builder envelopeRequestBuilder = new 
EnvelopeRequest.Builder(ByteBuffer.allocate(0),
+            kafkaPrincipalBuilder.serialize(kafkaPrincipal), 
"client-address".getBytes());
+
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            envelopeRequestBuilder,
+            completionHandler
+        );
+
+        testRequestThread.enqueue(queueItem);
+        // initialize to the controller
+        testRequestThread.doWork();
+
+        Node oldBrokerNode = new Node(oldControllerId, "host1", port);
+        assertEquals(Optional.of(oldBrokerNode), 
testRequestThread.activeControllerAddress());
+
+        // send and process the envelope request
+        mockClient.prepareResponse(
+            body -> body instanceof EnvelopeRequest,
+            envelopeResponseWithNotControllerError);
+        testRequestThread.doWork();
+        // expect to reset the activeControllerAddress after finding the 
NOT_CONTROLLER error
+        assertEquals(Optional.empty(), 
testRequestThread.activeControllerAddress());
+        // reinitialize the controller to a different node
+        testRequestThread.doWork();
+        // process the request again
+        mockClient.prepareResponse(expectedResponse);
+        testRequestThread.doWork();
+
+        Node newControllerNode = new Node(newControllerId, "host2", port);
+        assertEquals(Optional.of(newControllerNode), 
testRequestThread.activeControllerAddress());
+
+        assertTrue(completionHandler.completed.get());
+    }
+
+    @Test
+    void testRetryTimeout() {
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        int controllerId = 1;
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        Node controller = new Node(controllerId, "host1", 1234);
+        Supplier<ControllerInformation> controllerNodeProvider =
+            () -> controllerInfo(Optional.of(controller));
+
+        long retryTimeoutMs = 30000;
+        MetadataResponse responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
+            Map.of("a", Errors.NOT_CONTROLLER),
+            Map.of("a", 2));
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", retryTimeoutMs);
+        testRequestThread.setStarted(true);
+
+        TestControllerRequestCompletionHandler completionHandler =
+            new TestControllerRequestCompletionHandler();
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()
+                .setAllowAutoTopicCreation(true)),
+            completionHandler
+        );
+
+        testRequestThread.enqueue(queueItem);
+
+        // initialize to the controller
+        testRequestThread.doWork();
+
+        time.sleep(retryTimeoutMs);
+
+        // send and process the request
+        mockClient.prepareResponse(
+            body -> body instanceof MetadataRequest && ((MetadataRequest) 
body).allowAutoTopicCreation(),
+            responseWithNotControllerError);
+
+        testRequestThread.doWork();
+
+        assertTrue(completionHandler.timedOut.get());
+    }
+
+    @Test
+    void testUnsupportedVersionHandling() {
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        int controllerId = 2;
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        Node activeController = new Node(controllerId, "host", 1234);
+        Supplier<ControllerInformation> controllerNodeProvider =
+            () -> controllerInfo(Optional.of(activeController));
+
+        AtomicReference<ClientResponse> callbackResponse = new 
AtomicReference<>();
+        ControllerRequestCompletionHandler completionHandler = new 
ControllerRequestCompletionHandler() {
+            @Override
+            public void onTimeout() {
+                fail("Unexpected timeout exception");
+            }
+
+            @Override
+            public void onComplete(ClientResponse response) {
+                callbackResponse.set(response);
+            }
+        };
+
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()),
+            completionHandler
+        );
+
+        mockClient.prepareUnsupportedVersionResponse(request -> 
request.apiKey() == ApiKeys.METADATA);
+
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+        testRequestThread.setStarted(true);
+
+        testRequestThread.enqueue(queueItem);
+        pollUntil(testRequestThread, () -> callbackResponse.get() != null);
+        assertNotNull(callbackResponse.get().versionMismatch());
+    }
+
+    @Test
+    void testAuthenticationExceptionHandling() {
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+        int controllerId = 2;
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        Node activeController = new Node(controllerId, "host", 1234);
+        Supplier<ControllerInformation> controllerNodeProvider =
+            () -> controllerInfo(Optional.of(activeController));
+
+        AtomicReference<ClientResponse> callbackResponse = new 
AtomicReference<>();
+        ControllerRequestCompletionHandler completionHandler = new 
ControllerRequestCompletionHandler() {
+            @Override
+            public void onTimeout() {
+                fail("Unexpected timeout exception");
+            }
+
+            @Override
+            public void onComplete(ClientResponse response) {
+                callbackResponse.set(response);
+            }
+        };
+
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()),
+            completionHandler
+        );
+
+        mockClient.createPendingAuthenticationError(activeController, 50);
+
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+        testRequestThread.setStarted(true);
+
+        testRequestThread.enqueue(queueItem);
+        pollUntil(testRequestThread, () -> callbackResponse.get() != null);
+        assertNotNull(callbackResponse.get().authenticationException());
+        assertEquals(Optional.empty(), 
testRequestThread.activeControllerAddress());
+    }
+
+    @Test
+    void testThreadNotStarted() {
+        // Make sure we throw if we enqueue anything while the thread is not 
running
+        MockTime time = new MockTime();
+        AbstractConfig config = createConfig();
+
+        Metadata metadata = mock(Metadata.class);
+        MockClient mockClient = new MockClient(time, metadata);
+
+        Supplier<ControllerInformation> controllerNodeProvider = 
NodeToControllerRequestThreadTest::emptyControllerInfo;
+
+        NodeToControllerRequestThread testRequestThread = new 
NodeToControllerRequestThread(
+            mockClient, new ManualMetadataUpdater(),
+            controllerNodeProvider, config, time, "", Long.MAX_VALUE);
+
+        TestControllerRequestCompletionHandler completionHandler =
+            new TestControllerRequestCompletionHandler(null);
+        NodeToControllerQueueItem queueItem = new NodeToControllerQueueItem(
+            time.milliseconds(),
+            new MetadataRequest.Builder(new MetadataRequestData()),
+            completionHandler
+        );
+
+        assertThrows(IllegalStateException.class, () -> 
testRequestThread.enqueue(queueItem));
+        assertEquals(0, testRequestThread.queueSize());
+    }
+
+    private void pollUntil(NodeToControllerRequestThread requestThread, 
java.util.function.BooleanSupplier condition) {
+        pollUntil(requestThread, condition, 10);
+    }
+
+    private void pollUntil(NodeToControllerRequestThread requestThread, 
java.util.function.BooleanSupplier condition, int maxRetries) {
+        int tries = 0;
+        do {
+            requestThread.doWork();
+            tries++;
+        } while (!condition.getAsBoolean() && tries < maxRetries);
+
+        if (!condition.getAsBoolean()) {
+            fail("Condition failed to be met after polling " + tries + " 
times");
+        }
+    }
+
+    private static class TestControllerRequestCompletionHandler implements 
ControllerRequestCompletionHandler {
+        private final AbstractResponse expectedResponse;
+        final AtomicBoolean completed = new AtomicBoolean(false);
+        final AtomicBoolean timedOut = new AtomicBoolean(false);
+
+        TestControllerRequestCompletionHandler() {
+            this(null);
+        }
+
+        TestControllerRequestCompletionHandler(AbstractResponse 
expectedResponse) {
+            this.expectedResponse = expectedResponse;
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            if (expectedResponse != null) {
+                assertEquals(expectedResponse, response.responseBody());
+            }
+            completed.set(true);
+        }
+
+        @Override
+        public void onTimeout() {
+            timedOut.set(true);
+        }
+    }
+}


Reply via email to