This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 645c1ba526 MINOR: Fix buildResponseSend test cases for envelope 
responses (#12185)
645c1ba526 is described below

commit 645c1ba526ec11049429dc5e9ba347fc386df58e
Author: Jason Gustafson <[email protected]>
AuthorDate: Mon May 30 11:34:36 2022 -0700

    MINOR: Fix buildResponseSend test cases for envelope responses (#12185)
    
    The test cases we have in `RequestChannelTest` for `buildResponseSend` 
construct the envelope request incorrectly. The request is created using the 
envelope context, but also a reference to the wrapped envelope request object. 
This patch fixes `TestUtils.buildEnvelopeRequest` so that the wrapped request 
is built properly. It also fixes the dependence on this incorrect construction 
and consolidates the tests in `RequestChannelTest` to avoid duplication.
    
    Reviewers: dengziming <[email protected]>, David Jacot 
<[email protected]>
---
 .../main/scala/kafka/server/EnvelopeUtils.scala    |   3 +-
 .../unit/kafka/network/RequestChannelTest.scala    | 163 +++++++++++----------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   6 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  26 ++--
 4 files changed, 98 insertions(+), 100 deletions(-)

diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala 
b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
index ec8871f382..a162ae5fe8 100644
--- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
+++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
@@ -32,7 +32,8 @@ object EnvelopeUtils {
   def handleEnvelopeRequest(
     request: RequestChannel.Request,
     requestChannelMetrics: RequestChannel.Metrics,
-    handler: RequestChannel.Request => Unit): Unit = {
+    handler: RequestChannel.Request => Unit
+  ): Unit = {
     val envelope = request.body[EnvelopeRequest]
     val forwardedPrincipal = parseForwardedPrincipal(request.context, 
envelope.requestPrincipal)
     val forwardedClientAddress = 
parseForwardedClientAddress(envelope.clientAddress)
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index f3f8ca884c..bddf03a136 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -18,42 +18,44 @@
 package kafka.network
 
 
-import java.io.IOException
-import java.net.InetAddress
-import java.nio.ByteBuffer
-import java.util.Collections
 import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.network
+import kafka.server.EnvelopeUtils
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, 
SslConfigs, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
-import org.apache.kafka.common.network.{ByteBufferSend, ClientInformation, 
ListenerName}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest, 
RequestTestUtils}
+import org.apache.kafka.common.message.{CreateTopicsRequestData, 
CreateTopicsResponseData, IncrementalAlterConfigsRequestData}
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AlterConfigsRequest._
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.test
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.Mockito.mock
-import org.mockito.{ArgumentCaptor, Mockito}
 
+import java.io.IOException
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicReference
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class RequestChannelTest {
   private val requestChannelMetrics: RequestChannel.Metrics = 
mock(classOf[RequestChannel.Metrics])
-  private val clientId = "id"
   private val principalSerde = new KafkaPrincipalSerde() {
     override def serialize(principal: KafkaPrincipal): Array[Byte] = 
Utils.utf8(principal.toString)
     override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
   }
-  private val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
 
   @Test
   def testAlterRequests(): Unit = {
@@ -191,84 +193,66 @@ class RequestChannelTest {
     
assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
-  @Test
-  def 
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): 
Unit = {
-    val channelRequest = 
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = 
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response without error
-    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, 
Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = 
envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  @ParameterizedTest
+  @EnumSource(value=classOf[Errors], names=Array("NONE", 
"CLUSTER_AUTHORIZATION_FAILED", "NOT_CONTROLLER"))
+  def testBuildEnvelopeResponse(error: Errors): Unit = {
+    val topic = "foo"
+    val createTopicRequest = buildCreateTopicRequest(topic)
+    val unwrapped = buildUnwrappedEnvelopeRequest(createTopicRequest)
+
+    val createTopicResponse = buildCreateTopicResponse(topic, error)
+    val envelopeResponse = buildEnvelopeResponse(unwrapped, 
createTopicResponse)
+
+    error match {
+      case Errors.NOT_CONTROLLER =>
+        assertEquals(Errors.NOT_CONTROLLER, envelopeResponse.error)
+        assertNull(envelopeResponse.responseData)
+      case _ =>
+        assertEquals(Errors.NONE, envelopeResponse.error)
+        val unwrappedResponse = 
AbstractResponse.parseResponse(envelopeResponse.responseData, unwrapped.header)
+        assertEquals(createTopicResponse.data, unwrappedResponse.data)
+    }
   }
 
-  @Test
-  def 
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError():
 Unit = {
-    val channelRequest = 
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = 
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with REQUEST_TIMED_OUT error
-    val responseWithTimeoutError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.REQUEST_TIMED_OUT),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithTimeoutError)
-
-    // expect the envelopeResponse result without error
-    val capturedValue: EnvelopeResponse = 
envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NONE))
+  private def buildCreateTopicRequest(topic: String): CreateTopicsRequest = {
+    val requestData = new CreateTopicsRequestData()
+    requestData.topics.add(new CreatableTopic()
+      .setName(topic)
+      .setReplicationFactor(-1)
+      .setNumPartitions(-1)
+    )
+    new CreateTopicsRequest.Builder(requestData).build()
   }
 
-  @Test
-  def 
testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne():
 Unit = {
-    val channelRequest = 
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
-
-    val envelopeResponseArgumentCaptor = 
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
-
-    Mockito.doAnswer(_ => mockSend)
-      
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
-
-    // create an inner response with NOT_CONTROLLER error
-    val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
-      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
-      Collections.singletonMap("a", 2))
-
-    // build an envelope response
-    channelRequest.buildResponseSend(responseWithNotControllerError)
-
-    // expect the envelopeResponse result has NOT_CONTROLLER error
-    val capturedValue: EnvelopeResponse = 
envelopeResponseArgumentCaptor.getValue
-    assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+  private def buildCreateTopicResponse(
+    topic: String,
+    error: Errors,
+  ): CreateTopicsResponse = {
+    val responseData = new CreateTopicsResponseData()
+    responseData.topics.add(new CreateTopicsResponseData.CreatableTopicResult()
+      .setName(topic)
+      .setErrorCode(error.code)
+    )
+    new CreateTopicsResponse(responseData)
   }
 
-  private def buildMetadataRequest(): AbstractRequest = {
-    val resourceName = "topic-1"
-    val header = new RequestHeader(ApiKeys.METADATA, 
ApiKeys.METADATA.latestVersion,
-      clientId, 0)
+  private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): 
RequestChannel.Request = {
+    val wrappedRequest = TestUtils.buildEnvelopeRequest(
+      request,
+      principalSerde,
+      requestChannelMetrics,
+      System.nanoTime()
+    )
 
-    new MetadataRequest.Builder(Collections.singletonList(resourceName), 
true).build(header.apiVersion)
-  }
+    val unwrappedRequest = new AtomicReference[RequestChannel.Request]()
 
-  private def buildForwardRequestWithEnvelopeRequestAttached(request: 
AbstractRequest): RequestChannel.Request = {
-    val envelopeRequest = TestUtils.buildRequestWithEnvelope(
-      request, principalSerde, requestChannelMetrics, System.nanoTime(), 
shouldSpyRequestContext = true)
+    EnvelopeUtils.handleEnvelopeRequest(
+      wrappedRequest,
+      requestChannelMetrics,
+      request => unwrappedRequest.set(request)
+    )
 
-    TestUtils.buildRequestWithEnvelope(
-      request, principalSerde, requestChannelMetrics, System.nanoTime(), 
envelope = Option(envelopeRequest))
+    unwrappedRequest.get()
   }
 
   private def isValidJson(str: String): Boolean = {
@@ -312,4 +296,23 @@ class RequestChannelTest {
   private def toMap(config: 
IncrementalAlterConfigsRequestData.AlterableConfigCollection): Map[String, 
String] = {
     config.asScala.map(e => e.name -> e.value).toMap
   }
+
+  private def buildEnvelopeResponse(
+    unwrapped: RequestChannel.Request,
+    response: AbstractResponse
+  ): EnvelopeResponse = {
+    assertTrue(unwrapped.envelope.isDefined)
+    val envelope = unwrapped.envelope.get
+
+    val send = unwrapped.buildResponseSend(response)
+    val sendBytes = test.TestUtils.toBuffer(send)
+
+    // We need to read the size field before `parseResponse` below
+    val size = sendBytes.getInt
+    assertEquals(size, sendBytes.remaining())
+    val envelopeResponse = AbstractResponse.parseResponse(sendBytes, 
envelope.header)
+
+    assertTrue(envelopeResponse.isInstanceOf[EnvelopeResponse])
+    envelopeResponse.asInstanceOf[EnvelopeResponse]
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 74722d5e49..cc34cabe05 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -307,7 +307,7 @@ class KafkaApisTest {
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
-    val request = TestUtils.buildRequestWithEnvelope(
+    val request = TestUtils.buildEnvelopeRequest(
       alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
 
     val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = 
ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
@@ -341,7 +341,7 @@ class KafkaApisTest {
 
     when(controller.isActive).thenReturn(true)
 
-    val request = TestUtils.buildRequestWithEnvelope(
+    val request = TestUtils.buildEnvelopeRequest(
       leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
@@ -396,7 +396,7 @@ class KafkaApisTest {
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false)
       .build(requestHeader.apiVersion)
 
-    val request = TestUtils.buildRequestWithEnvelope(
+    val request = TestUtils.buildEnvelopeRequest(
       alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds(), fromPrivilegedListener)
 
     val capturedResponse: ArgumentCaptor[AbstractResponse] = 
ArgumentCaptor.forClass(classOf[AbstractResponse])
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e097dbd620..ad3c34f960 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -76,7 +76,6 @@ import 
org.apache.zookeeper.KeeperException.SessionExpiredException
 import org.apache.zookeeper.ZooDefs._
 import org.apache.zookeeper.data.ACL
 import org.junit.jupiter.api.Assertions._
-import org.mockito.Mockito
 
 import scala.annotation.nowarn
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -2172,14 +2171,13 @@ object TestUtils extends Logging {
     }
   }
 
-  def buildRequestWithEnvelope(request: AbstractRequest,
-                               principalSerde: KafkaPrincipalSerde,
-                               requestChannelMetrics: RequestChannel.Metrics,
-                               startTimeNanos: Long,
-                               fromPrivilegedListener: Boolean = true,
-                               shouldSpyRequestContext: Boolean = false,
-                               envelope: Option[RequestChannel.Request] = None
-                              ): RequestChannel.Request = {
+  def buildEnvelopeRequest(
+    request: AbstractRequest,
+    principalSerde: KafkaPrincipalSerde,
+    requestChannelMetrics: RequestChannel.Metrics,
+    startTimeNanos: Long,
+    fromPrivilegedListener: Boolean = true
+  ): RequestChannel.Request = {
     val clientId = "id"
     val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
 
@@ -2195,22 +2193,18 @@ object TestUtils extends Logging {
 
     RequestHeader.parse(envelopeBuffer)
 
-    var requestContext = new RequestContext(envelopeHeader, "1", 
InetAddress.getLocalHost,
+    val envelopeContext = new RequestContext(envelopeHeader, "1", 
InetAddress.getLocalHost,
       KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY,
       fromPrivilegedListener, Optional.of(principalSerde))
 
-    if (shouldSpyRequestContext) {
-      requestContext = Mockito.spy(requestContext)
-    }
-
     new RequestChannel.Request(
       processor = 1,
-      context = requestContext,
+      context = envelopeContext,
       startTimeNanos = startTimeNanos,
       memoryPool = MemoryPool.NONE,
       buffer = envelopeBuffer,
       metrics = requestChannelMetrics,
-      envelope = envelope
+      envelope = None
     )
   }
 

Reply via email to