This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 0af0f0d KAFKA-12677: Return not_controller error in envelope response
itself in KRaft mode (#10794)
0af0f0d is described below
commit 0af0f0daba39b451d0781b27f66e07a5a5fdae14
Author: Luke Chen <[email protected]>
AuthorDate: Tue Jul 13 00:17:46 2021 +0800
KAFKA-12677: Return not_controller error in envelope response itself in
KRaft mode (#10794)
In Kafka Raft mode, the flow sending request from client to controller is
like this:
1. client send request to a random controller (ex: A-controller)
2. A-controller will forward the request to active controller (ex:
B-controller) to handle the request
3. After active B-controller completed the request, the A-controller will
receive the response, and do a check:
3.1. if the response has "disconnected" or "NOT_CONTROLLER" error, which
means the cached active controller is changed. So, clear the cached active
controller, and wait for next retry to get the updated active controller from
`controllerNodeProvider`
3.2. else, complete the request and respond back to client
In this bug, we have 2 issues existed:
1. "NOT_CONTROLLER" exception won't be correctly send back to the
requester, instead, `UNKNOWN_SERVER_ERROR` will be returned. The reason is the
`NotControllerException` is wrapped by a `CompletionException` when the
`Future` completeExceptionally. And the `CompletionException` will not match
any Errors we defined, so the `UNKNOWN_SERVER_ERROR` will be returned. Even if
we don't want the `NotControllerException` return back to client, we need to
know it to do some check.
fix 1: unwrap the `CompletionException` before encoding the exception to
error.
2. Even if we fixed 1st bug, we still haven't fixed this issue. After the
1st bug fixed, the client can successfully get `NotControllerException` now,
and keep retrying... until timeout. So, why won't it meet the flow `3.1`
mentioned above, since it has `NotControllerException`? The reason is, we
wrapped the original request with `EnvelopeRequest` and forwarded to active
controller. So, after the active controller completed the request, responded
with `NotControllerException`, and the [...]
fix 2: Make the envelope response return `NotControllerException` if the
controller response has `NotControllerException`. So that we can catch the
`NotControllerException` on envelopeResponse to update the active controller.
Reviewers: wenbingshen <[email protected]>, Ismael Juma
<[email protected]>, dengziming <[email protected]>, Jason Gustafson
<[email protected]>
---
.../org/apache/kafka/common/requests/ApiError.java | 14 +++-
.../apache/kafka/common/requests/ApiErrorTest.java | 82 ++++++++++++++++++
.../main/scala/kafka/network/RequestChannel.scala | 13 ++-
.../server/BrokerToControllerChannelManager.scala | 2 +-
.../main/scala/kafka/server/ControllerApis.scala | 1 +
.../integration/kafka/server/RaftClusterTest.scala | 2 +-
.../BrokerToControllerRequestThreadTest.scala | 81 +++++++++++++++++-
.../unit/kafka/network/RequestChannelTest.scala | 98 +++++++++++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 44 ++--------
.../test/scala/unit/kafka/utils/TestUtils.scala | 55 +++++++++++-
10 files changed, 338 insertions(+), 54 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 38712ad..0196653 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.protocol.Errors;
import java.util.Objects;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
/**
* Encapsulates an error code (via the Errors enum) and an optional message.
Generally, the optional message is only
@@ -36,10 +38,18 @@ public class ApiError {
private final String message;
public static ApiError fromThrowable(Throwable t) {
+ Throwable throwableToBeEncoded = t;
+ // Get the underlying cause for common exception types from the
concurrent library.
+ // This is useful to handle cases where exceptions may be raised from
a future or a
+ // completion stage (as might be the case for requests sent to the
controller in `ControllerApis`)
+ if (t instanceof CompletionException || t instanceof
ExecutionException) {
+ throwableToBeEncoded = t.getCause();
+ }
// Avoid populating the error message if it's a generic one. Also
don't populate error
// message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive
information.
- Errors error = Errors.forException(t);
- String message = error == Errors.UNKNOWN_SERVER_ERROR ||
error.message().equals(t.getMessage()) ? null : t.getMessage();
+ Errors error = Errors.forException(throwableToBeEncoded);
+ String message = error == Errors.UNKNOWN_SERVER_ERROR ||
+ error.message().equals(throwableToBeEncoded.getMessage()) ? null :
throwableToBeEncoded.getMessage();
return new ApiError(error, message);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
new file mode 100644
index 0000000..8b0aa47
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiErrorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ApiErrorTest {
+
+ @ParameterizedTest
+ @MethodSource("parameters")
+ public void fromThrowableShouldReturnCorrectError(Throwable t, Errors
expectedErrors, String expectedMsg) {
+ ApiError apiError = ApiError.fromThrowable(t);
+ assertEquals(apiError.error(), expectedErrors);
+ assertEquals(apiError.message(), expectedMsg);
+ }
+
+ private static Collection<Arguments> parameters() {
+ List<Arguments> arguments = new ArrayList<>();
+
+ arguments.add(Arguments.of(
+ new UnknownServerException("Don't leak sensitive information "),
Errors.UNKNOWN_SERVER_ERROR, null));
+
+ arguments.add(Arguments.of(
+ new NotEnoughReplicasException(), Errors.NOT_ENOUGH_REPLICAS,
null));
+
+ // avoid populating the error message if it's a generic one
+ arguments.add(Arguments.of(
+ new
UnknownTopicOrPartitionException(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
Errors.UNKNOWN_TOPIC_OR_PARTITION, null));
+
+ String notCoordinatorErrorMsg = "Not coordinator";
+ arguments.add(Arguments.of(
+ new NotCoordinatorException(notCoordinatorErrorMsg),
Errors.NOT_COORDINATOR, notCoordinatorErrorMsg));
+
+ String notControllerErrorMsg = "Not controller";
+ // test the NotControllerException is wrapped in the
CompletionException, should return correct error
+ arguments.add(Arguments.of(
+ new CompletionException(new
NotControllerException(notControllerErrorMsg)), Errors.NOT_CONTROLLER,
notControllerErrorMsg));
+
+ String requestTimeoutErrorMsg = "request time out";
+ // test the TimeoutException is wrapped in the ExecutionException,
should return correct error
+ arguments.add(Arguments.of(
+ new ExecutionException(new
TimeoutException(requestTimeoutErrorMsg)), Errors.REQUEST_TIMED_OUT,
requestTimeoutErrorMsg));
+
+ // test the exception not in the Errors list, should return
UNKNOWN_SERVER_ERROR
+ arguments.add(Arguments.of(new IOException(),
Errors.UNKNOWN_SERVER_ERROR, null));
+
+ return arguments;
+ }
+}
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2530abf..5e456b0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -20,7 +20,6 @@ package kafka.network
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent._
-
import com.fasterxml.jackson.databind.JsonNode
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
@@ -32,6 +31,7 @@ import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.EnvelopeResponseData
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Errors,
ObjectSerializationCache}
import org.apache.kafka.common.requests._
@@ -124,8 +124,15 @@ object RequestChannel extends Logging {
def buildResponseSend(abstractResponse: AbstractResponse): Send = {
envelope match {
case Some(request) =>
- val responseBytes =
context.buildResponseEnvelopePayload(abstractResponse)
- val envelopeResponse = new EnvelopeResponse(responseBytes,
Errors.NONE)
+ val envelopeResponse = if
(abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+ // Since it's a NOT_CONTROLLER error response, we need to make
envelope response with NOT_CONTROLLER error
+ // to notify the requester (i.e. BrokerToControllerRequestThread)
to update active controller
+ new EnvelopeResponse(new EnvelopeResponseData()
+ .setErrorCode(Errors.NOT_CONTROLLER.code()))
+ } else {
+ val responseBytes =
context.buildResponseEnvelopePayload(abstractResponse)
+ new EnvelopeResponse(responseBytes, Errors.NONE)
+ }
request.context.buildResponseSend(envelopeResponse)
case None =>
context.buildResponseSend(abstractResponse)
diff --git
a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index fb8fc91..132a2cc 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.ApiMessageAndVersion
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 96822e8..2c6557f 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -22,6 +22,7 @@ import java.util.Collections
import java.util.Map.Entry
import java.util.concurrent.{CompletableFuture, ExecutionException}
import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
+
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
index 1b5c008..fce5af9 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
@@ -29,7 +29,7 @@ import java.util
import java.util.Collections
import scala.jdk.CollectionConverters._
-@Timeout(120000)
+@Timeout(120)
class RaftClusterTest {
@Test
diff --git
a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index 46f329e..3297ec0 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -17,20 +17,23 @@
package kafka.server
+import java.nio.ByteBuffer
import java.util.Collections
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
-
import kafka.utils.TestUtils
-import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater,
Metadata, MockClient}
+import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater,
Metadata, MockClient, NodeApiVersions}
import org.apache.kafka.common.Node
-import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.message.{EnvelopeResponseData,
MetadataRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest,
MetadataResponse, RequestTestUtils}
+import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest,
EnvelopeResponse, MetadataRequest, MetadataResponse, RequestTestUtils}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.MockTime
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._
+
class BrokerToControllerRequestThreadTest {
@Test
@@ -211,6 +214,76 @@ class BrokerToControllerRequestThreadTest {
}
@Test
+ def testEnvelopeResponseWithNotControllerError(): Unit = {
+ val time = new MockTime()
+ val config = new KafkaConfig(TestUtils.createBrokerConfig(1,
"localhost:2181"))
+ val oldControllerId = 1
+ val newControllerId = 2
+
+ val metadata = mock(classOf[Metadata])
+ val mockClient = new MockClient(time, metadata)
+ // enable envelope API
+ mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.ENVELOPE.id,
0.toShort, 0.toShort))
+
+ val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+ val port = 1234
+ val oldController = new Node(oldControllerId, "host1", port)
+ val newController = new Node(newControllerId, "host2", port)
+
+ when(controllerNodeProvider.get()).thenReturn(Some(oldController),
Some(newController))
+
+ // create an envelopeResponse with NOT_CONTROLLER error
+ val envelopeResponseWithNotControllerError = new EnvelopeResponse(
+ new EnvelopeResponseData().setErrorCode(Errors.NOT_CONTROLLER.code()))
+
+ // response for retry request after receiving NOT_CONTROLLER error
+ val expectedResponse = RequestTestUtils.metadataUpdateWith(3,
Collections.singletonMap("a", 2))
+
+ val testRequestThread = new BrokerToControllerRequestThread(mockClient,
new ManualMetadataUpdater(), controllerNodeProvider,
+ config, time, "", retryTimeoutMs = Long.MaxValue)
+ testRequestThread.started = true
+
+ val completionHandler = new
TestRequestCompletionHandler(Some(expectedResponse))
+ val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
"principal", true)
+ val kafkaPrincipalBuilder = new DefaultKafkaPrincipalBuilder(null, null)
+
+ // build an EnvelopeRequest by dummy data
+ val envelopeRequestBuilder = new
EnvelopeRequest.Builder(ByteBuffer.allocate(0),
+ kafkaPrincipalBuilder.serialize(kafkaPrincipal),
"client-address".getBytes)
+
+ val queueItem = BrokerToControllerQueueItem(
+ time.milliseconds(),
+ envelopeRequestBuilder,
+ completionHandler
+ )
+
+ testRequestThread.enqueue(queueItem)
+ // initialize to the controller
+ testRequestThread.doWork()
+
+ val oldBrokerNode = new Node(oldControllerId, "host1", port)
+ assertEquals(Some(oldBrokerNode),
testRequestThread.activeControllerAddress())
+
+ // send and process the envelope request
+ mockClient.prepareResponse((body: AbstractRequest) => {
+ body.isInstanceOf[EnvelopeRequest]
+ }, envelopeResponseWithNotControllerError)
+ testRequestThread.doWork()
+ // expect to reset the activeControllerAddress after finding the
NOT_CONTROLLER error
+ assertEquals(None, testRequestThread.activeControllerAddress())
+ // reinitialize the controller to a different node
+ testRequestThread.doWork()
+ // process the request again
+ mockClient.prepareResponse(expectedResponse)
+ testRequestThread.doWork()
+
+ val newControllerNode = new Node(newControllerId, "host2", port)
+ assertEquals(Some(newControllerNode),
testRequestThread.activeControllerAddress())
+
+ assertTrue(completionHandler.completed.get())
+ }
+
+ @Test
def testRetryTimeout(): Unit = {
val time = new MockTime()
val config = new KafkaConfig(TestUtils.createBrokerConfig(1,
"localhost:2181"))
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 78618c2..881ea65 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -22,26 +22,38 @@ import java.io.IOException
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.Collections
-
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.network
+import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs,
SslConfigs, TopicConfig}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
-import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.network.{ByteBufferSend, ClientInformation,
ListenerName}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest,
RequestTestUtils}
import org.apache.kafka.common.requests.AlterConfigsRequest._
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
+import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.easymock.EasyMock._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
+import org.mockito.{ArgumentCaptor, Mockito}
+import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
class RequestChannelTest {
+ private val requestChannelMetrics: RequestChannel.Metrics =
mock(classOf[RequestChannel.Metrics])
+ private val clientId = "id"
+ private val principalSerde = new KafkaPrincipalSerde() {
+ override def serialize(principal: KafkaPrincipal): Array[Byte] =
Utils.utf8(principal.toString)
+ override def deserialize(bytes: Array[Byte]): KafkaPrincipal =
SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
+ }
+ private val mockSend: ByteBufferSend = Mockito.mock(classOf[ByteBufferSend])
@Test
def testAlterRequests(): Unit = {
@@ -179,6 +191,86 @@ class RequestChannelTest {
assertTrue(isValidJson(RequestConvertToJson.request(alterConfigs.loggableRequest).toString))
}
+ @Test
+ def
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoError():
Unit = {
+ val channelRequest =
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
+
+ val envelopeResponseArgumentCaptor =
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+ Mockito.doAnswer(_ => mockSend)
+
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
+
+ // create an inner response without error
+ val responseWithoutError = RequestTestUtils.metadataUpdateWith(2,
Collections.singletonMap("a", 2))
+
+ // build an envelope response
+ channelRequest.buildResponseSend(responseWithoutError)
+
+ // expect the envelopeResponse result without error
+ val capturedValue: EnvelopeResponse =
envelopeResponseArgumentCaptor.getValue
+ assertTrue(capturedValue.error().equals(Errors.NONE))
+ }
+
+ @Test
+ def
testEnvelopeBuildResponseSendShouldReturnNoErrorIfInnerResponseHasNoNotControllerError():
Unit = {
+ val channelRequest =
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
+
+ val envelopeResponseArgumentCaptor =
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+ Mockito.doAnswer(_ => mockSend)
+
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
+
+ // create an inner response with REQUEST_TIMED_OUT error
+ val responseWithTimeoutError =
RequestTestUtils.metadataUpdateWith("cluster1", 2,
+ Collections.singletonMap("a", Errors.REQUEST_TIMED_OUT),
+ Collections.singletonMap("a", 2))
+
+ // build an envelope response
+ channelRequest.buildResponseSend(responseWithTimeoutError)
+
+ // expect the envelopeResponse result without error
+ val capturedValue: EnvelopeResponse =
envelopeResponseArgumentCaptor.getValue
+ assertTrue(capturedValue.error().equals(Errors.NONE))
+ }
+
+ @Test
+ def
testEnvelopeBuildResponseSendShouldReturnNotControllerErrorIfInnerResponseHasOne():
Unit = {
+ val channelRequest =
buildForwardRequestWithEnvelopeRequestAttached(buildMetadataRequest())
+
+ val envelopeResponseArgumentCaptor =
ArgumentCaptor.forClass(classOf[EnvelopeResponse])
+
+ Mockito.doAnswer(_ => mockSend)
+
.when(channelRequest.envelope.get.context).buildResponseSend(envelopeResponseArgumentCaptor.capture())
+
+ // create an inner response with NOT_CONTROLLER error
+ val responseWithNotControllerError =
RequestTestUtils.metadataUpdateWith("cluster1", 2,
+ Collections.singletonMap("a", Errors.NOT_CONTROLLER),
+ Collections.singletonMap("a", 2))
+
+ // build an envelope response
+ channelRequest.buildResponseSend(responseWithNotControllerError)
+
+ // expect the envelopeResponse result has NOT_CONTROLLER error
+ val capturedValue: EnvelopeResponse =
envelopeResponseArgumentCaptor.getValue
+ assertTrue(capturedValue.error().equals(Errors.NOT_CONTROLLER))
+ }
+
+ private def buildMetadataRequest(): AbstractRequest = {
+ val resourceName = "topic-1"
+ val header = new RequestHeader(ApiKeys.METADATA,
ApiKeys.METADATA.latestVersion,
+ clientId, 0)
+
+ new MetadataRequest.Builder(Collections.singletonList(resourceName),
true).build(header.apiVersion)
+ }
+
+ private def buildForwardRequestWithEnvelopeRequestAttached(request:
AbstractRequest): RequestChannel.Request = {
+ val envelopeRequest = TestUtils.buildRequestWithEnvelope(
+ request, principalSerde, requestChannelMetrics, System.nanoTime(),
shouldSpyRequestContext = true)
+
+ TestUtils.buildRequestWithEnvelope(
+ request, principalSerde, requestChannelMetrics, System.nanoTime(),
envelope = Option(envelopeRequest))
+ }
+
private def isValidJson(str: String): Boolean = {
try {
val mapper = new ObjectMapper
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f846da4..e454229 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -23,7 +23,6 @@ import java.util
import java.util.Arrays.asList
import java.util.concurrent.TimeUnit
import java.util.{Collections, Optional, Properties, Random}
-
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
@@ -301,7 +300,9 @@ class KafkaApisTest {
Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava,
false).build(requestHeader.apiVersion)
- val request = buildRequestWithEnvelope(alterConfigsRequest,
fromPrivilegedListener = true)
+ val request = TestUtils.buildRequestWithEnvelope(
+ alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds())
+
val capturedResponse = EasyMock.newCapture[AbstractResponse]()
val capturedRequest = EasyMock.newCapture[RequestChannel.Request]()
@@ -335,7 +336,9 @@ class KafkaApisTest {
EasyMock.expect(controller.isActive).andReturn(true)
- val request = buildRequestWithEnvelope(leaveGroupRequest,
fromPrivilegedListener = true)
+ val request = TestUtils.buildRequestWithEnvelope(
+ leaveGroupRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds())
+
val capturedResponse = expectNoThrottling(request)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel,
controller)
@@ -388,8 +391,8 @@ class KafkaApisTest {
val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava,
false)
.build(requestHeader.apiVersion)
- val request = buildRequestWithEnvelope(alterConfigsRequest,
- fromPrivilegedListener = fromPrivilegedListener)
+ val request = TestUtils.buildRequestWithEnvelope(
+ alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics,
time.nanoseconds(), fromPrivilegedListener)
val capturedResponse = EasyMock.newCapture[AbstractResponse]()
if (shouldCloseConnection) {
@@ -3248,37 +3251,6 @@ class KafkaApisTest {
(writeTxnMarkersRequest, buildRequest(writeTxnMarkersRequest))
}
- private def buildRequestWithEnvelope(
- request: AbstractRequest,
- fromPrivilegedListener: Boolean,
- principalSerde: KafkaPrincipalSerde = kafkaPrincipalSerde
- ): RequestChannel.Request = {
- val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-
- val requestHeader = new RequestHeader(request.apiKey, request.version,
clientId, 0)
- val requestBuffer = request.serializeWithHeader(requestHeader)
-
- val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
- val envelopeBuffer = new EnvelopeRequest.Builder(
- requestBuffer,
- principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
- InetAddress.getLocalHost.getAddress
- ).build().serializeWithHeader(envelopeHeader)
- val envelopeContext = new RequestContext(envelopeHeader, "1",
InetAddress.getLocalHost,
- KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
- fromPrivilegedListener, Optional.of(principalSerde))
-
- RequestHeader.parse(envelopeBuffer)
- new RequestChannel.Request(
- processor = 1,
- context = envelopeContext,
- startTimeNanos = time.nanoseconds(),
- memoryPool = MemoryPool.NONE,
- buffer = envelopeBuffer,
- metrics = requestChannelMetrics
- )
- }
-
private def buildRequest(request: AbstractRequest,
listenerName: ListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
fromPrivilegedListener: Boolean = false,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 51ea34a..d21adcc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -25,7 +25,7 @@ import java.security.cert.X509Certificate
import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
-import java.util.{Arrays, Collections, Properties}
+import java.util.{Arrays, Collections, Optional, Properties}
import com.yammer.metrics.core.Meter
import javax.net.ssl.X509TrustManager
@@ -34,6 +34,7 @@ import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log._
import kafka.metrics.KafkaYammerMetrics
+import kafka.network.RequestChannel
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
@@ -50,14 +51,16 @@ import
org.apache.kafka.common.config.ConfigResource.Type.TOPIC
import org.apache.kafka.common.errors.{KafkaStorageException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.memory.MemoryPool
import
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ListenerName, Mode}
-import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest,
RequestContext, RequestHeader}
import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils}
@@ -68,7 +71,9 @@ import
org.apache.zookeeper.KeeperException.SessionExpiredException
import org.apache.zookeeper.ZooDefs._
import org.apache.zookeeper.data.ACL
import org.junit.jupiter.api.Assertions._
+import org.mockito.Mockito
+import java.net.InetAddress
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Map, Seq, mutable}
import scala.concurrent.duration.FiniteDuration
@@ -1854,4 +1859,46 @@ object TestUtils extends Logging {
authorizer, resource)
}
+ def buildRequestWithEnvelope(request: AbstractRequest,
+ principalSerde: KafkaPrincipalSerde,
+ requestChannelMetrics: RequestChannel.Metrics,
+ startTimeNanos: Long,
+ fromPrivilegedListener: Boolean = true,
+ shouldSpyRequestContext: Boolean = false,
+ envelope: Option[RequestChannel.Request] = None
+ ): RequestChannel.Request = {
+ val clientId = "id"
+ val listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+
+ val requestHeader = new RequestHeader(request.apiKey, request.version,
clientId, 0)
+ val requestBuffer = request.serializeWithHeader(requestHeader)
+
+ val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
+ val envelopeBuffer = new EnvelopeRequest.Builder(
+ requestBuffer,
+ principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
+ InetAddress.getLocalHost.getAddress
+ ).build().serializeWithHeader(envelopeHeader)
+
+ RequestHeader.parse(envelopeBuffer)
+
+ var requestContext = new RequestContext(envelopeHeader, "1",
InetAddress.getLocalHost,
+ KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
+ fromPrivilegedListener, Optional.of(principalSerde))
+
+ if (shouldSpyRequestContext) {
+ requestContext = Mockito.spy(requestContext)
+ }
+
+ new RequestChannel.Request(
+ processor = 1,
+ context = requestContext,
+ startTimeNanos = startTimeNanos,
+ memoryPool = MemoryPool.NONE,
+ buffer = envelopeBuffer,
+ metrics = requestChannelMetrics,
+ envelope = envelope
+ )
+ }
+
}