This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 0af0f0d  KAFKA-12677: Return not_controller error in envelope response 
itself in KRaft mode (#10794)
0af0f0d is described below

commit 0af0f0daba39b451d0781b27f66e07a5a5fdae14
Author: Luke Chen <[email protected]>
AuthorDate: Tue Jul 13 00:17:46 2021 +0800

    KAFKA-12677: Return not_controller error in envelope response itself in 
KRaft mode (#10794)
    
    In Kafka Raft mode, the flow sending request from client to controller is 
like this:
    1. client send request to a random controller (ex: A-controller)
    2. A-controller will forward the request to active controller (ex: 
B-controller) to handle the request
    3. After active B-controller completed the request, the A-controller will 
receive the response, and do a check:
      3.1. if the response has "disconnected" or "NOT_CONTROLLER" error, which 
means the cached active controller is changed. So, clear the cached active 
controller, and wait for next retry to get the updated active controller from 
`controllerNodeProvider`
      3.2. else, complete the request and respond back to client
    
    In this bug, we have 2 issues existed:
    1. "NOT_CONTROLLER" exception won't be correctly send back to the 
requester, instead, `UNKNOWN_SERVER_ERROR` will be returned. The reason is the 
`NotControllerException` is wrapped by a `CompletionException` when the 
`Future` completeExceptionally. And the `CompletionException` will not match 
any Errors we defined, so the `UNKNOWN_SERVER_ERROR` will be returned. Even if 
we don't want the `NotControllerException` return back to client, we need to 
know it to do some check.
    
    fix 1: unwrap the `CompletionException` before encoding the exception to 
error.
    
    2. Even if we fixed 1st bug, we still haven't fixed this issue. After the 
1st bug fixed, the client can successfully get `NotControllerException` now, 
and keep retrying... until timeout. So, why won't it meet the flow `3.1` 
mentioned above, since it has `NotControllerException`? The reason is, we 
wrapped the original request with `EnvelopeRequest` and forwarded to active 
controller. So, after the active controller completed the request, responded 
with `NotControllerException`, and the [...]
    
    fix 2: Make the envelope response return `NotControllerException` if the 
controller response has `NotControllerException`. So that we can catch the 
`NotControllerException` on envelopeResponse to update the active controller.
    
    Reviewers: wenbingshen <[email protected]>, Ismael Juma 
<[email protected]>, dengziming <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../org/apache/kafka/common/requests/ApiError.java | 14 +++-
 .../apache/kafka/common/requests/ApiErrorTest.java | 82 ++++++++++++++++++
 .../main/scala/kafka/network/RequestChannel.scala  | 13 ++-
 .../server/BrokerToControllerChannelManager.scala  |  2 +-
 .../main/scala/kafka/server/ControllerApis.scala   |  1 +
 .../integration/kafka/server/RaftClusterTest.scala |  2 +-
 .../BrokerToControllerRequestThreadTest.scala      | 81 +++++++++++++++++-
 .../unit/kafka/network/RequestChannelTest.scala    | 98 +++++++++++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 44 ++--------
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 55 +++++++++++-
 10 files changed, 338 insertions(+), 54 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 38712ad..0196653 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.util.Objects;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Encapsulates an error code (via the Errors enum) and an optional message. 
Generally, the optional message is only
@@ -36,10 +38,18 @@ public class ApiError {
     private final String message;
 
     public static ApiError fromThrowable(Throwable t) {
+        Throwable throwableToBeEncoded = t;
+        // Get the underlying cause for common exception types from the 
concurrent library.
+        // This is useful to handle cases where exceptions may be raised from 
a future or a
+        // completion stage (as might be the case for requests sent to the 
controller in `ControllerApis`)
+        if (t instanceof CompletionException || t instanceof 
ExecutionException) {
+            throwableToBeEncoded = t.getCause();
+        }
         // Avoid populating the error message if it's a generic one. Also 
don't populate error
         // message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive 
information.
-        Errors error = Errors.forException(t);
-        String message = error == Errors.UNKNOWN_SERVER_ERROR || 
error.message().equals(t.getMessage()) ? null : t.getMessage();
+        Errors error = Errors.forException(throwableToBeEncoded);
+        String message = error == Errors.UNKNOWN_SERVER_ERROR ||
+            error.message().equals(throwableToBeEncoded.getMessage()) ? null : 
throwableToBeEncoded.getMessage();
         return new ApiError(error, message);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
new file mode 100644
index 0000000..8b0aa47
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ApiErrorTest {
+
+    @ParameterizedTest
+    @MethodSource("parameters")
+    public void fromThrowableShouldReturnCorrectError(Throwable t, Errors 
expectedErrors, String expectedMsg) {
+        ApiError apiError = ApiError.fromThrowable(t);
+        assertEquals(apiError.error(), expectedErrors);
+        assertEquals(apiError.message(), expectedMsg);
+    }
+
+    private static Collection<Arguments> parameters() {
+        List<Arguments> arguments = new ArrayList<>();
+
+        arguments.add(Arguments.of(
+            new UnknownServerException("Don't leak sensitive information "), 
Errors.UNKNOWN_SERVER_ERROR, null));
+
+        arguments.add(Arguments.of(
+            new NotEnoughReplicasException(), Errors.NOT_ENOUGH_REPLICAS, 
null));
+
+        // avoid populating the error message if it's a generic one
+        arguments.add(Arguments.of(
+            new 
UnknownTopicOrPartitionException(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()), 
Errors.UNKNOWN_TOPIC_OR_PARTITION, null));
+
+        String notCoordinatorErrorMsg = "Not coordinator";
+        arguments.add(Arguments.of(
+            new NotCoordinatorException(notCoordinatorErrorMsg), 
Errors.NOT_COORDINATOR, notCoordinatorErrorMsg));
+
+        String notControllerErrorMsg = "Not controller";
+        // test the NotControllerException is wrapped in the 
CompletionException, should return correct error
+        arguments.add(Arguments.of(
+            new CompletionException(new 
NotControllerException(notControllerErrorMsg)), Errors.NOT_CONTROLLER, 
notControllerErrorMsg));
+
+        String requestTimeoutErrorMsg = "request time out";
+        // test the TimeoutException is wrapped in the ExecutionException, 
should return correct error
+        arguments.add(Arguments.of(
+            new ExecutionException(new 
TimeoutException(requestTimeoutErrorMsg)), Errors.REQUEST_TIMED_OUT, 
requestTimeoutErrorMsg));
+
+        // test the exception not in the Errors list, should return 
UNKNOWN_SERVER_ERROR
+        arguments.add(Arguments.of(new IOException(), 
Errors.UNKNOWN_SERVER_ERROR, null));
+
+        return arguments;
+    }
+}
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2530abf..5e456b0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -20,7 +20,6 @@ package kafka.network
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.concurrent._
-
 import com.fasterxml.jackson.databind.JsonNode
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.Meter
@@ -32,6 +31,7 @@ import kafka.utils.Implicits._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.EnvelopeResponseData
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, 
ObjectSerializationCache}
 import org.apache.kafka.common.requests._
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val responseBytes = 
context.buildResponseEnvelopePayload(abstractResponse)
-          val envelopeResponse = new EnvelopeResponse(responseBytes, 
Errors.NONE)
+          val envelopeResponse = if 
(abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+            // Since it's a NOT_CONTROLLER error response, we need to make 
envelope response with NOT_CONTROLLER error
+            // to notify the requester (i.e. BrokerToControllerRequestThread) 
to update active controller
+            new EnvelopeResponse(new EnvelopeResponseData()
+              .setErrorCode(Errors.NOT_CONTROLLER.code()))
+          } else {
+            val responseBytes = 
context.buildResponseEnvelopePayload(abstractResponse)
+            new EnvelopeResponse(responseBytes, Errors.NONE)
+          }
           request.context.buildResponseSend(envelopeResponse)
         case None =>
           context.buildResponseSend(abstractResponse)
diff --git 
a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index fb8fc91..132a2cc 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.ApiMessageAndVersion
 
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 96822e8..2c6557f 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -22,6 +22,7 @@ import java.util.Collections
 import java.util.Map.Entry
 import java.util.concurrent.{CompletableFuture, ExecutionException}
 import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
+
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
index 1b5c008..fce5af9 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
@@ -29,7 +29,7 @@ import java.util
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-@Timeout(120000)
+@Timeout(120)
 class RaftClusterTest {
 
   @Test
diff --git 
a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala 
b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index 46f329e..3297ec0 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -17,20 +17,23 @@
 
 package kafka.server
 
+import java.nio.ByteBuffer
 import java.util.Collections
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
-
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, 
Metadata, MockClient}
+import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, 
Metadata, MockClient, NodeApiVersions}
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.message.{EnvelopeResponseData, 
MetadataRequestData}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest, 
MetadataResponse, RequestTestUtils}
+import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, 
EnvelopeResponse, MetadataRequest, MetadataResponse, RequestTestUtils}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.MockTime
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+
 class BrokerToControllerRequestThreadTest {
 
   @Test
@@ -211,6 +214,76 @@ class BrokerToControllerRequestThreadTest {
   }
 
   @Test
+  def testEnvelopeResponseWithNotControllerError(): Unit = {
+    val time = new MockTime()
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, 
"localhost:2181"))
+    val oldControllerId = 1
+    val newControllerId = 2
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+    // enable envelope API
+    mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id, 
0.toShort, 0.toShort))
+
+    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val port = 1234
+    val oldController = new Node(oldControllerId, "host1", port)
+    val newController = new Node(newControllerId, "host2", port)
+
+    when(controllerNodeProvider.get()).thenReturn(Some(oldController), 
Some(newController))
+
+    // create an envelopeResponse with NOT_CONTROLLER error
+    val envelopeResponseWithNotControllerError = new EnvelopeResponse(
+      new EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()))
+
+    // response for retry request after receiving NOT_CONTROLLER error
+    val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
Collections.singletonMap("a", 2))
+
+    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+      config, time, "", retryTimeoutMs = Long.MaxValue)
+    testRequestThread.started = true
+
+    val completionHandler = new 
TestRequestCompletionHandler(Some(expectedResponse))
+    val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"principal", true)
+    val kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
+
+    // build an EnvelopeRequest by dummy data
+    val envelopeRequestBuilder = new 
EnvelopeRequest.Builder(ByteBuffer.allocate(0),
+      kafkaPrincipalBuilder.serialize(kafkaPrincipal), 
"client-address".getBytes)
+
+    val queueItem = BrokerToControllerQueueItem(
+      time.milliseconds(),
+      envelopeRequestBuilder,
+      completionHandler
+    )
+
+    testRequestThread.enqueue(queueItem)
+    // initialize to the controller
+    testRequestThread.doWork()
+
+    val oldBrokerNode = new Node(oldControllerId, "host1", port)
+    assertEquals(Some(oldBrokerNode), 
testRequestThread.activeControllerAddress())
+
+    // send and process the envelope request
+    mockClient.prepareResponse((body: AbstractRequest) => {
+      body.isInstanceOf[EnvelopeRequest]
+    }, envelopeResponseWithNotControllerError)
+    testRequestThread.doWork()
+    // expect to reset the activeControllerAddress after finding the 
NOT_CONTROLLER error
+    assertEquals(None, testRequestThread.activeControllerAddress())
+    // reinitialize the controller to a different node
+    testRequestThread.doWork()
+    // process the request again
+    mockClient.prepareResponse(expectedResponse)
+    testRequestThread.doWork()
+
+    val newControllerNode = new Node(newControllerId, "host2", port)
+    assertEquals(Some(newControllerNode), 
testRequestThread.activeControllerAddress())
+
+    assertTrue(completionHandler.completed.get())
+  }
+
+  @Test
   def testRetryTimeout(): Unit = {
     val time = new MockTime()
     val config = new KafkaConfig(TestUtils.createBrokerConfig(1, 
"localhost:2181"))
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 78618c2..881ea65 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -22,26 +22,38 @@ import java.io.IOException
 import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.Collections
-
 import com.fasterxml.jackson.databind.ObjectMapper
 import kafka.network
+import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, 
SslConfigs, TopicConfig}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
-import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.network.{ByteBufferSend, ClientInformation, 
ListenerName}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest, 
RequestTestUtils}
 import org.apache.kafka.common.requests.AlterConfigsRequest._
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
+import org.apache.kafka.common.utils.{SecurityUtils, Utils}
 import org.easymock.EasyMock._
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api._
+import org.mockito.{ArgumentCaptor, Mockito}
 
+import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
 class RequestChannelTest {
+  private val requestChannelMetrics: RequestChannel.Metrics = 
mock(classOf[RequestChannel.Metrics])
+  private val clientId = "id"
+  private val principalSerde = new KafkaPrincipalSerde() {
+    override def serialize(principal: KafkaPrincipal): Array[Byte] = 
Utils.utf8(principal.toString)
+    override def deserialize(bytes: Array[Byte]): KafkaPrincipal = 
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+  }
+  private val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
 
   @Test
   def testAlterRequests(): Unit = {
@@ -179,6 +191,86 @@ class RequestChannelTest {
     
assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
   }
 
+  @Test
+  def 
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError(): 
Unit = {
+    val channelRequest = 
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
+
+    val envelopeResponseArgumentCaptor = 
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doAnswer(_ => mockSend)
+      
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
+
+    // create an inner response without error
+    val responseWithoutError = RequestTestUtils.metadataUpdateWith(2, 
Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(responseWithoutError)
+
+    // expect the envelopeResponse result without error
+    val capturedValue: EnvelopeResponse = 
envelopeResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NONE))
+  }
+
+  @Test
+  def 
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError():
 Unit = {
+    val channelRequest = 
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
+
+    val envelopeResponseArgumentCaptor = 
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doAnswer(_ => mockSend)
+      
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
+
+    // create an inner response with REQUEST_TIMED_OUT error
+    val responseWithTimeoutError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
+      Collections.singletonMap("a", Errors.REQUEST_TIMED_OUT),
+      Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(responseWithTimeoutError)
+
+    // expect the envelopeResponse result without error
+    val capturedValue: EnvelopeResponse = 
envelopeResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NONE))
+  }
+
+  @Test
+  def 
testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne():
 Unit = {
+    val channelRequest = 
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
+
+    val envelopeResponseArgumentCaptor = 
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+    Mockito.doAnswer(_ => mockSend)
+      
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
+
+    // create an inner response with NOT_CONTROLLER error
+    val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
+      Collections.singletonMap("a", Errors.NOT_CONTROLLER),
+      Collections.singletonMap("a", 2))
+
+    // build an envelope response
+    channelRequest.buildResponseSend(responseWithNotControllerError)
+
+    // expect the envelopeResponse result has NOT_CONTROLLER error
+    val capturedValue: EnvelopeResponse = 
envelopeResponseArgumentCaptor.getValue
+    assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+  }
+
+  private def buildMetadataRequest(): AbstractRequest = {
+    val resourceName = "topic-1"
+    val header = new RequestHeader(ApiKeys.METADATA, 
ApiKeys.METADATA.latestVersion,
+      clientId, 0)
+
+    new MetadataRequest.Builder(Collections.singletonList(resourceName), 
true).build(header.apiVersion)
+  }
+
+  private def buildForwardRequestWithEnvelopeRequestAttached(request: 
AbstractRequest): RequestChannel.Request = {
+    val envelopeRequest = TestUtils.buildRequestWithEnvelope(
+      request, principalSerde, requestChannelMetrics, System.nanoTime(), 
shouldSpyRequestContext = true)
+
+    TestUtils.buildRequestWithEnvelope(
+      request, principalSerde, requestChannelMetrics, System.nanoTime(), 
envelope = Option(envelopeRequest))
+  }
+
   private def isValidJson(str: String): Boolean = {
     try {
       val mapper = new ObjectMapper
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f846da4..e454229 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -23,7 +23,6 @@ import java.util
 import java.util.Arrays.asList
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, Optional, Properties, Random}
-
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, Partition}
 import kafka.controller.{ControllerContext, KafkaController}
@@ -301,7 +300,9 @@ class KafkaApisTest {
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
-    val request = buildRequestWithEnvelope(alterConfigsRequest, 
fromPrivilegedListener = true)
+    val request = TestUtils.buildRequestWithEnvelope(
+      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
+
     val capturedResponse = EasyMock.newCapture[AbstractResponse]()
     val capturedRequest = EasyMock.newCapture[RequestChannel.Request]()
 
@@ -335,7 +336,9 @@ class KafkaApisTest {
 
     EasyMock.expect(controller.isActive).andReturn(true)
 
-    val request = buildRequestWithEnvelope(leaveGroupRequest, 
fromPrivilegedListener = true)
+    val request = TestUtils.buildRequestWithEnvelope(
+      leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
+
     val capturedResponse = expectNoThrottling(request)
 
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
controller)
@@ -388,8 +391,8 @@ class KafkaApisTest {
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false)
       .build(requestHeader.apiVersion)
 
-    val request = buildRequestWithEnvelope(alterConfigsRequest,
-      fromPrivilegedListener = fromPrivilegedListener)
+    val request = TestUtils.buildRequestWithEnvelope(
+      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds(), fromPrivilegedListener)
 
     val capturedResponse = EasyMock.newCapture[AbstractResponse]()
     if (shouldCloseConnection) {
@@ -3248,37 +3251,6 @@ class KafkaApisTest {
     (writeTxnMarkersRequest, buildRequest(writeTxnMarkersRequest))
   }
 
-  private def buildRequestWithEnvelope(
-    request: AbstractRequest,
-    fromPrivilegedListener: Boolean,
-    principalSerde: KafkaPrincipalSerde = kafkaPrincipalSerde
-  ): RequestChannel.Request = {
-    val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-
-    val requestHeader = new RequestHeader(request.apiKey, request.version, 
clientId, 0)
-    val requestBuffer = request.serializeWithHeader(requestHeader)
-
-    val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
-    val envelopeBuffer = new EnvelopeRequest.Builder(
-      requestBuffer,
-      principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
-      InetAddress.getLocalHost.getAddress
-    ).build().serializeWithHeader(envelopeHeader)
-    val envelopeContext = new RequestContext(envelopeHeader, "1", 
InetAddress.getLocalHost,
-      KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY,
-      fromPrivilegedListener, Optional.of(principalSerde))
-
-    RequestHeader.parse(envelopeBuffer)
-    new RequestChannel.Request(
-      processor = 1,
-      context = envelopeContext,
-      startTimeNanos = time.nanoseconds(),
-      memoryPool = MemoryPool.NONE,
-      buffer = envelopeBuffer,
-      metrics = requestChannelMetrics
-    )
-  }
-
   private def buildRequest(request: AbstractRequest,
                            listenerName: ListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
                            fromPrivilegedListener: Boolean = false,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 51ea34a..d21adcc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.security.cert.X509Certificate
 import java.time.Duration
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-import java.util.{Arrays, Collections, Properties}
+import java.util.{Arrays, Collections, Optional, Properties}
 import com.yammer.metrics.core.Meter
 
 import javax.net.ssl.X509TrustManager
@@ -34,6 +34,7 @@ import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log._
 import kafka.metrics.KafkaYammerMetrics
+import kafka.network.RequestChannel
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
@@ -50,14 +51,16 @@ import 
org.apache.kafka.common.config.ConfigResource.Type.TOPIC
 import org.apache.kafka.common.errors.{KafkaStorageException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.memory.MemoryPool
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, Mode}
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, 
RequestContext, RequestHeader}
 import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -68,7 +71,9 @@ import 
org.apache.zookeeper.KeeperException.SessionExpiredException
 import org.apache.zookeeper.ZooDefs._
 import org.apache.zookeeper.data.ACL
 import org.junit.jupiter.api.Assertions._
+import org.mockito.Mockito
 
+import java.net.InetAddress
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.{Map, Seq, mutable}
 import scala.concurrent.duration.FiniteDuration
@@ -1854,4 +1859,46 @@ object TestUtils extends Logging {
       authorizer, resource)
   }
 
+  def buildRequestWithEnvelope(request: AbstractRequest,
+                               principalSerde: KafkaPrincipalSerde,
+                               requestChannelMetrics: RequestChannel.Metrics,
+                               startTimeNanos: Long,
+                               fromPrivilegedListener: Boolean = true,
+                               shouldSpyRequestContext: Boolean = false,
+                               envelope: Option[RequestChannel.Request] = None
+                              ): RequestChannel.Request = {
+    val clientId = "id"
+    val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+
+    val requestHeader = new RequestHeader(request.apiKey, request.version, 
clientId, 0)
+    val requestBuffer = request.serializeWithHeader(requestHeader)
+
+    val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
+    val envelopeBuffer = new EnvelopeRequest.Builder(
+      requestBuffer,
+      principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
+      InetAddress.getLocalHost.getAddress
+    ).build().serializeWithHeader(envelopeHeader)
+
+    RequestHeader.parse(envelopeBuffer)
+
+    var requestContext = new RequestContext(envelopeHeader, "1", 
InetAddress.getLocalHost,
+      KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, 
ClientInformation.EMPTY,
+      fromPrivilegedListener, Optional.of(principalSerde))
+
+    if (shouldSpyRequestContext) {
+      requestContext = Mockito.spy(requestContext)
+    }
+
+    new RequestChannel.Request(
+      processor = 1,
+      context = requestContext,
+      startTimeNanos = startTimeNanos,
+      memoryPool = MemoryPool.NONE,
+      buffer = envelopeBuffer,
+      metrics = requestChannelMetrics,
+      envelope = envelope
+    )
+  }
+
 }

Reply via email to