dajac commented on code in PR #12877:
URL: https://github.com/apache/kafka/pull/12877#discussion_r1031546311


##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
       TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
       val response = receive[FetchResponse](socket, ApiKeys.FETCH, version)
       assertEquals(Errors.NONE, response.error)
-      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts())
+      assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
     } finally {
       socket.close()
     }
   }
+
+  @Test
+  def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is 
the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+
+    val topicPartition = new TopicPartition(topic, 0)
+    val offsetMap = Map(topicPartition -> 10L)
+
+    val request = createConsumerFetchRequest(
+      maxResponseBytes = 1000,
+      maxPartitionBytes = 1000,
+      Seq(topicPartition),
+      offsetMap,
+      ApiKeys.FETCH.latestVersion,
+      maxWaitMs = 20000,
+      minBytes = 1,
+      rackId = followerBrokerId.toString
+    )
+    var response = connectAndReceive[FetchResponse](request, 
brokers(leaderBrokerId).socketServer)
+    assertEquals(Errors.NONE, response.error)
+    assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts)
+    validateFetchResponse(response, preferredReadReplica = 1)
+
+    // Shutdown follower broker. Consumer will reach out to leader after 
metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()

Review Comment:
   ah ok. i did not see this because i thought that the purpose of the patch 
was to verify the ISR behavior. i guess that this extra test does not hurt so 
we can keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to