dajac commented on code in PR #12877: URL: https://github.com/apache/kafka/pull/12877#discussion_r1031546311
########## core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala: ########## @@ -84,9 +84,58 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1) val response = receive[FetchResponse](socket, ApiKeys.FETCH, version) assertEquals(Errors.NONE, response.error) - assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts()) + assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts) } finally { socket.close() } } + + @Test + def testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(): Unit = { + // Create a topic with 2 replicas where broker 0 is the leader and 1 is the follower. + val admin = createAdminClient() + TestUtils.createTopicWithAdmin( + admin, + topic, + brokers, + replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) + ) + + TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10) + + val topicPartition = new TopicPartition(topic, 0) + val offsetMap = Map(topicPartition -> 10L) + + val request = createConsumerFetchRequest( + maxResponseBytes = 1000, + maxPartitionBytes = 1000, + Seq(topicPartition), + offsetMap, + ApiKeys.FETCH.latestVersion, + maxWaitMs = 20000, + minBytes = 1, + rackId = followerBrokerId.toString + ) + var response = connectAndReceive[FetchResponse](request, brokers(leaderBrokerId).socketServer) + assertEquals(Errors.NONE, response.error) + assertEquals(Map(Errors.NONE -> 2).asJava, response.errorCounts) + validateFetchResponse(response, preferredReadReplica = 1) + + // Shutdown follower broker. Consumer will reach out to leader after metadata.max.age.ms + brokers(followerBrokerId).shutdown() Review Comment: ah ok. i did not see this because i thought that the purpose of the patch was to verify the ISR behavior. i guess that this extra test does not hurt so we can keep it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org