This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b1d83e2b04c Revert "Revert "KAFKA-15661: KIP-951: Server side changes
(#14444)" (#14738)" (#14747)
b1d83e2b04c is described below
commit b1d83e2b04c92cebb5687c55ef2797186dbd0cf2
Author: Crispin Bernier <[email protected]>
AuthorDate: Thu Nov 16 18:42:34 2023 -0500
Revert "Revert "KAFKA-15661: KIP-951: Server side changes (#14444)"
(#14738)" (#14747)
This KIP-951 commit was reverted to investigate the
org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest test
failure (#14738).
A fix for that was merged in #14757, hence unreverting this change.
This reverts commit a98bd7d.
Reviewers: Justine Olshan <[email protected]>, Mayank Shekhar Narula
<[email protected]>
---
.../kafka/common/requests/ProduceResponse.java | 26 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 53 +++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 270 ++++++++++++++++++++-
3 files changed, 342 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index a8c2a801e4c..186ad9b80a1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.ProduceResponseData.LeaderIdAndEpoch;
@@ -72,7 +73,7 @@ public class ProduceResponse extends AbstractResponse {
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
- this(responses, DEFAULT_THROTTLE_TIME);
+ this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
}
/**
@@ -83,10 +84,23 @@ public class ProduceResponse extends AbstractResponse {
*/
@Deprecated
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses,
int throttleTimeMs) {
- this(toData(responses, throttleTimeMs));
+ this(toData(responses, throttleTimeMs, Collections.emptyList()));
}
- private static ProduceResponseData toData(Map<TopicPartition,
PartitionResponse> responses, int throttleTimeMs) {
+ /**
+ * Constructor for the latest version
+ * This is deprecated in favor of using the ProduceResponseData
constructor, KafkaApis should switch to that
+ * in KAFKA-10730
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+ @Deprecated
+ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses,
int throttleTimeMs, List<Node> nodeEndpoints) {
+ this(toData(responses, throttleTimeMs, nodeEndpoints));
+ }
+
+ private static ProduceResponseData toData(Map<TopicPartition,
PartitionResponse> responses, int throttleTimeMs, List<Node> nodeEndpoints) {
ProduceResponseData data = new
ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
responses.forEach((tp, response) -> {
ProduceResponseData.TopicProduceResponse tpr =
data.responses().find(tp.topic());
@@ -110,6 +124,12 @@ public class ProduceResponse extends AbstractResponse {
.setBatchIndexErrorMessage(e.message))
.collect(Collectors.toList())));
});
+ nodeEndpoints.forEach(endpoint -> data.nodeEndpoints()
+ .add(new ProduceResponseData.NodeEndpoint()
+ .setNodeId(endpoint.id())
+ .setHost(endpoint.host())
+ .setPort(endpoint.port())
+ .setRack(endpoint.rack())));
return data;
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index bd0959d40d2..3f5a435d147 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -562,6 +562,23 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Option[Node])
+
+ private def getCurrentLeader(tp: TopicPartition, ln: ListenerName):
LeaderNode = {
+ val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+ val (leaderId, leaderEpoch) = partitionInfoOrError match {
+ case Right(x) =>
+ (x.leaderReplicaIdOpt.getOrElse(-1), x.getLeaderEpoch)
+ case Left(x) =>
+ debug(s"Unable to retrieve local leaderId and Epoch with error $x,
falling back to metadata cache")
+ metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
+ case Some(pinfo) => (pinfo.leader(), pinfo.leaderEpoch())
+ case None => (-1, -1)
+ }
+ }
+ LeaderNode(leaderId, leaderEpoch,
metadataCache.getAliveBrokerNode(leaderId, ln))
+ }
+
/**
* Handle a produce request
*/
@@ -614,6 +631,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses
++ nonExistingTopicResponses ++ invalidRequestResponses
var errorInResponse = false
+ val nodeEndpoints = new mutable.HashMap[Int, Node]
mergedResponseStatus.forKeyValue { (topicPartition, status) =>
if (status.error != Errors.NONE) {
errorInResponse = true
@@ -622,6 +640,20 @@ class KafkaApis(val requestChannel: RequestChannel,
request.header.clientId,
topicPartition,
status.error.exceptionName))
+
+ if (request.header.apiVersion >= 10) {
+ status.error match {
+ case Errors.NOT_LEADER_OR_FOLLOWER =>
+ val leaderNode = getCurrentLeader(topicPartition,
request.context.listenerName)
+ leaderNode.node.foreach { node =>
+ nodeEndpoints.put(node.id(), node)
+ }
+ status.currentLeader
+ .setLeaderId(leaderNode.leaderId)
+ .setLeaderEpoch(leaderNode.leaderEpoch)
+ case _ =>
+ }
+ }
}
}
@@ -665,7 +697,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendNoOpResponseExemptThrottle(request)
}
} else {
- requestChannel.sendResponse(request, new
ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs), None)
+ requestChannel.sendResponse(request, new
ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs,
nodeEndpoints.values.toList.asJava), None)
}
}
@@ -843,6 +875,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setRecords(unconvertedRecords)
.setPreferredReadReplica(partitionData.preferredReadReplica)
.setDivergingEpoch(partitionData.divergingEpoch)
+ .setCurrentLeader(partitionData.currentLeader())
}
}
}
@@ -851,6 +884,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def processResponseCallback(responsePartitionData: Seq[(TopicIdPartition,
FetchPartitionData)]): Unit = {
val partitions = new util.LinkedHashMap[TopicIdPartition,
FetchResponseData.PartitionData]
val reassigningPartitions = mutable.Set[TopicIdPartition]()
+ val nodeEndpoints = new mutable.HashMap[Int, Node]
responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.orElse(null)
val lastStableOffset: Long =
data.lastStableOffset.orElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
@@ -864,6 +898,21 @@ class KafkaApis(val requestChannel: RequestChannel,
.setAbortedTransactions(abortedTransactions)
.setRecords(data.records)
.setPreferredReadReplica(data.preferredReadReplica.orElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
+
+ if (versionId >= 16) {
+ data.error match {
+ case Errors.NOT_LEADER_OR_FOLLOWER | Errors.FENCED_LEADER_EPOCH =>
+ val leaderNode = getCurrentLeader(tp.topicPartition(),
request.context.listenerName)
+ leaderNode.node.foreach { node =>
+ nodeEndpoints.put(node.id(), node)
+ }
+ partitionData.currentLeader()
+ .setLeaderId(leaderNode.leaderId)
+ .setLeaderEpoch(leaderNode.leaderEpoch)
+ case _ =>
+ }
+ }
+
data.divergingEpoch.ifPresent(partitionData.setDivergingEpoch(_))
partitions.put(tp, partitionData)
}
@@ -887,7 +936,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// Prepare fetch response from converted data
val response =
- FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs,
unconvertedFetchResponse.sessionId, convertedData)
+ FetchResponse.of(unconvertedFetchResponse.error, throttleTimeMs,
unconvertedFetchResponse.sessionId, convertedData,
nodeEndpoints.values.toList.asJava)
// record the bytes out metrics only when the response is being sent
response.data.responses.forEach { topicResponse =>
topicResponse.partitions.forEach { data =>
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 17abdb0472d..41e67e61f5e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -24,9 +24,10 @@ import java.util.Arrays.asList
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.{Collections, Optional, OptionalInt, OptionalLong, Properties}
import kafka.api.LeaderAndIsr
-import kafka.cluster.Broker
+import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
+import kafka.log.UnifiedLog
import kafka.metrics.ClientMetricsTestUtils
import kafka.network.{RequestChannel, RequestMetrics}
import kafka.server.QuotaFactory.QuotaManagers
@@ -98,7 +99,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0,
IBP_2_2_IV1}
import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams,
FetchPartitionData}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams,
FetchPartitionData, LogConfig}
class KafkaApisTest {
private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
@@ -2475,6 +2476,204 @@ class KafkaApisTest {
}
}
+ @Test
+ def testProduceResponseContainsNewLeaderOnNotLeaderOrFollower(): Unit = {
+ val topic = "topic"
+ addTopicToMetadataCache(topic, numPartitions = 2, numBrokers = 3)
+
+ for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
+
+ reset(replicaManager, clientQuotaManager, clientRequestQuotaManager,
requestChannel, txnCoordinator)
+
+ val responseCallback: ArgumentCaptor[Map[TopicPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
+
+ val tp = new TopicPartition(topic, 0)
+ val partition = mock(classOf[Partition])
+ val newLeaderId = 2
+ val newLeaderEpoch = 5
+
+ val produceRequest = ProduceRequest.forCurrentMagic(new
ProduceRequestData()
+ .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
+ Collections.singletonList(new ProduceRequestData.TopicProduceData()
+ .setName(tp.topic).setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(tp.partition)
+ .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("test".getBytes))))))
+ .iterator))
+ .setAcks(1.toShort)
+ .setTimeoutMs(5000))
+ .build(version.toShort)
+ val request = buildRequest(produceRequest)
+
+ when(replicaManager.appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(false),
+ ArgumentMatchers.eq(AppendOrigin.CLIENT),
+ any(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any())
+ ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new
PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
+
+ when(replicaManager.getPartitionOrError(tp)).thenAnswer(_ =>
Right(partition))
+ when(partition.leaderReplicaIdOpt).thenAnswer(_ => Some(newLeaderId))
+ when(partition.getLeaderEpoch).thenAnswer(_ => newLeaderEpoch)
+
+
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+ createKafkaApis().handleProduceRequest(request,
RequestLocal.withThreadConfinedCaching)
+
+ val response = verifyNoThrottling[ProduceResponse](request)
+
+ assertEquals(1, response.data.responses.size)
+ val topicProduceResponse = response.data.responses.asScala.head
+ assertEquals(1, topicProduceResponse.partitionResponses.size)
+ val partitionProduceResponse =
topicProduceResponse.partitionResponses.asScala.head
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(partitionProduceResponse.errorCode))
+ assertEquals(newLeaderId,
partitionProduceResponse.currentLeader.leaderId())
+ assertEquals(newLeaderEpoch,
partitionProduceResponse.currentLeader.leaderEpoch())
+ assertEquals(1, response.data.nodeEndpoints.size)
+ val node = response.data.nodeEndpoints.asScala.head
+ assertEquals(2, node.nodeId)
+ assertEquals("broker2", node.host)
+ }
+ }
+
+ @Test
+ def testProduceResponseReplicaManagerLookupErrorOnNotLeaderOrFollower():
Unit = {
+ val topic = "topic"
+ addTopicToMetadataCache(topic, numPartitions = 2, numBrokers = 3)
+
+ for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
+
+ reset(replicaManager, clientQuotaManager, clientRequestQuotaManager,
requestChannel, txnCoordinator)
+
+ val responseCallback: ArgumentCaptor[Map[TopicPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
+
+ val tp = new TopicPartition(topic, 0)
+
+ val produceRequest = ProduceRequest.forCurrentMagic(new
ProduceRequestData()
+ .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
+ Collections.singletonList(new ProduceRequestData.TopicProduceData()
+ .setName(tp.topic).setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(tp.partition)
+ .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("test".getBytes))))))
+ .iterator))
+ .setAcks(1.toShort)
+ .setTimeoutMs(5000))
+ .build(version.toShort)
+ val request = buildRequest(produceRequest)
+
+ when(replicaManager.appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(false),
+ ArgumentMatchers.eq(AppendOrigin.CLIENT),
+ any(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any())
+ ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new
PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
+
+ when(replicaManager.getPartitionOrError(tp)).thenAnswer(_ =>
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+ createKafkaApis().handleProduceRequest(request,
RequestLocal.withThreadConfinedCaching)
+
+ val response = verifyNoThrottling[ProduceResponse](request)
+
+ assertEquals(1, response.data.responses.size)
+ val topicProduceResponse = response.data.responses.asScala.head
+ assertEquals(1, topicProduceResponse.partitionResponses.size)
+ val partitionProduceResponse =
topicProduceResponse.partitionResponses.asScala.head
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(partitionProduceResponse.errorCode))
+ // LeaderId and epoch should be the same values inserted into the
metadata cache
+ assertEquals(0, partitionProduceResponse.currentLeader.leaderId())
+ assertEquals(1, partitionProduceResponse.currentLeader.leaderEpoch())
+ assertEquals(1, response.data.nodeEndpoints.size)
+ val node = response.data.nodeEndpoints.asScala.head
+ assertEquals(0, node.nodeId)
+ assertEquals("broker0", node.host)
+ }
+ }
+
+ @Test
+ def testProduceResponseMetadataLookupErrorOnNotLeaderOrFollower(): Unit = {
+ val topic = "topic"
+ metadataCache = mock(classOf[ZkMetadataCache])
+
+ for (version <- 10 to ApiKeys.PRODUCE.latestVersion) {
+
+ reset(replicaManager, clientQuotaManager, clientRequestQuotaManager,
requestChannel, txnCoordinator)
+
+ val responseCallback: ArgumentCaptor[Map[TopicPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
+
+ val tp = new TopicPartition(topic, 0)
+
+ val produceRequest = ProduceRequest.forCurrentMagic(new
ProduceRequestData()
+ .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
+ Collections.singletonList(new ProduceRequestData.TopicProduceData()
+ .setName(tp.topic).setPartitionData(Collections.singletonList(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(tp.partition)
+ .setRecords(MemoryRecords.withRecords(CompressionType.NONE, new
SimpleRecord("test".getBytes))))))
+ .iterator))
+ .setAcks(1.toShort)
+ .setTimeoutMs(5000))
+ .build(version.toShort)
+ val request = buildRequest(produceRequest)
+
+ when(replicaManager.appendRecords(anyLong,
+ anyShort,
+ ArgumentMatchers.eq(false),
+ ArgumentMatchers.eq(AppendOrigin.CLIENT),
+ any(),
+ responseCallback.capture(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any())
+ ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new
PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
+
+ when(replicaManager.getPartitionOrError(tp)).thenAnswer(_ =>
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+ when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+ when(metadataCache.contains(tp)).thenAnswer(_ => true)
+ when(metadataCache.getPartitionInfo(tp.topic(),
tp.partition())).thenAnswer(_ => Option.empty)
+ when(metadataCache.getAliveBrokerNode(any(),
any())).thenReturn(Option.empty)
+
+ createKafkaApis().handleProduceRequest(request,
RequestLocal.withThreadConfinedCaching)
+
+ val response = verifyNoThrottling[ProduceResponse](request)
+
+ assertEquals(1, response.data.responses.size)
+ val topicProduceResponse = response.data.responses.asScala.head
+ assertEquals(1, topicProduceResponse.partitionResponses.size)
+ val partitionProduceResponse =
topicProduceResponse.partitionResponses.asScala.head
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(partitionProduceResponse.errorCode))
+ assertEquals(-1, partitionProduceResponse.currentLeader.leaderId())
+ assertEquals(-1, partitionProduceResponse.currentLeader.leaderEpoch())
+ assertEquals(0, response.data.nodeEndpoints.size)
+ }
+ }
+
@Test
def testTransactionalParametersSetCorrectly(): Unit = {
val topic = "topic"
@@ -3786,6 +3985,73 @@ class KafkaApisTest {
assertEquals(MemoryRecords.EMPTY,
FetchResponse.recordsOrFail(partitionData))
}
+ @Test
+ def testFetchResponseContainsNewLeaderOnNotLeaderOrFollower(): Unit = {
+ val topicId = Uuid.randomUuid()
+ val tidp = new TopicIdPartition(topicId, new TopicPartition("foo", 0))
+ val tp = tidp.topicPartition
+ addTopicToMetadataCache(tp.topic, numPartitions = 1, numBrokers = 3,
topicId)
+
+
when(replicaManager.getLogConfig(ArgumentMatchers.eq(tp))).thenReturn(Some(LogConfig.fromProps(
+ Collections.emptyMap(),
+ new Properties()
+ )))
+
+ val partition = mock(classOf[Partition])
+ val newLeaderId = 2
+ val newLeaderEpoch = 5
+
+ when(replicaManager.getPartitionOrError(tp)).thenAnswer(_ =>
Right(partition))
+ when(partition.leaderReplicaIdOpt).thenAnswer(_ => Some(newLeaderId))
+ when(partition.getLeaderEpoch).thenAnswer(_ => newLeaderEpoch)
+
+ when(replicaManager.fetchMessages(
+ any[FetchParams],
+ any[Seq[(TopicIdPartition, FetchRequest.PartitionData)]],
+ any[ReplicaQuota],
+ any[Seq[(TopicIdPartition, FetchPartitionData)] => Unit]()
+ )).thenAnswer(invocation => {
+ val callback =
invocation.getArgument(3).asInstanceOf[Seq[(TopicIdPartition,
FetchPartitionData)] => Unit]
+ callback(Seq(tidp -> new
FetchPartitionData(Errors.NOT_LEADER_OR_FOLLOWER, UnifiedLog.UnknownOffset,
UnifiedLog.UnknownOffset, MemoryRecords.EMPTY,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)))
+ })
+
+ val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID,
0, 0, 1000,
+ Optional.empty())).asJava
+ val fetchDataBuilder = Map(tp -> new
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000,
+ Optional.empty())).asJava
+ val fetchMetadata = new JFetchMetadata(0, 0)
+ val fetchContext = new FullFetchContext(time, new FetchSessionCache(1000,
100),
+ fetchMetadata, fetchData, false, false)
+ when(fetchManager.newContext(
+ any[Short],
+ any[JFetchMetadata],
+ any[Boolean],
+ any[util.Map[TopicIdPartition, FetchRequest.PartitionData]],
+ any[util.List[TopicIdPartition]],
+ any[util.Map[Uuid, String]])).thenReturn(fetchContext)
+
+ when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+ val fetchRequest = new FetchRequest.Builder(16, 16, -1, -1, 100, 0,
fetchDataBuilder)
+ .build()
+ val request = buildRequest(fetchRequest)
+
+ createKafkaApis().handleFetchRequest(request)
+
+ val response = verifyNoThrottling[FetchResponse](request)
+ val responseData = response.responseData(metadataCache.topicIdsToNames(),
16)
+
+ val partitionData = responseData.get(tp)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode)
+ assertEquals(newLeaderId, partitionData.currentLeader.leaderId())
+ assertEquals(newLeaderEpoch, partitionData.currentLeader.leaderEpoch())
+ val node = response.data.nodeEndpoints.asScala.head
+ assertEquals(2, node.nodeId)
+ assertEquals("broker2", node.host)
+ }
+
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testHandleJoinGroupRequest(version: Short): Unit = {