artemlivshits commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1044843523


##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)

Review Comment:
   For the new unit test I added a function to get preferred replica, so all 
explicit code to build and send fetch request got moved into the new function.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1154,7 +1155,9 @@ Node selectReadReplica(TopicPartition partition, Node 
leaderReplica, long curren
             } else {
                 log.trace("Not fetching from {} for partition {} since it is 
marked offline or is missing from our metadata," +
                           " using the leader instead.", nodeId, partition);
-                subscriptions.clearPreferredReadReplica(partition);
+                // Note that this condition may happen due to stale metadata, 
so we clear preferred replica and
+                // refresh metadata.
+                requestMetadataUpdate(partition);

Review Comment:
   Added unit test.  Verified that it failed without this change and passed 
with this change.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after 
metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    val topicPartition = new TopicPartition(topic, 0)
+    TestUtils.waitUntilTrue(() => {
+      val endpoints = 
brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition,
 listenerName)
+      !endpoints.contains(followerBrokerId)
+    }, "follower is still reachable.")
+
+    assertEquals(-1, getPreferredReplica)
+  }
+
+  @Test
+  def testFetchFromFollowerWithRoll(): Unit = {
+    // Create a topic with 2 replicas where broker 0 is the leader and 1 is 
the follower.
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin,
+      topic,
+      brokers,
+      replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
+    )
+
+    // Create consumer with client.rack = follower id.
+    val consumerProps = new Properties
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.CLIENT_RACK_CONFIG, 
followerBrokerId.toString)
+    val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, 
new ByteArrayDeserializer)
+    try {
+      consumer.subscribe(List(topic).asJava)
+
+      // Wait until preferred replica is set to follower.
+      TestUtils.waitUntilTrue(() => {
+        getPreferredReplica == 1
+      }, "Preferred replica is not set")
+
+      // Produce and consume.
+      TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
+      TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
+
+      // Shutdown follower, produce and consume should work.
+      brokers(followerBrokerId).shutdown()
+      TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
+      TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
+
+      // Start the follower and wait until preferred replica is set to 
follower.
+      brokers(followerBrokerId).startup()
+      TestUtils.waitUntilTrue(() => {
+        getPreferredReplica == 1
+      }, "Preferred replica is not set")
+
+      // Produce and consume should still work.
+      TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 1)
+      TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
+    } finally {
+      consumer.close()
+    }
+  }
+
+  private def getPreferredReplica: Int = {

Review Comment:
   Previously explicit logic moved from a unit test to this utility function.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after 
metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    val topicPartition = new TopicPartition(topic, 0)
+    TestUtils.waitUntilTrue(() => {
+      val endpoints = 
brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition,
 listenerName)
+      !endpoints.contains(followerBrokerId)
+    }, "follower is still reachable.")

Review Comment:
   This is actually not changed.



##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends 
BaseFetchRequestTest {
 
     TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
 
+    assertEquals(1, getPreferredReplica)
+
+    // Shutdown follower broker. Consumer will reach out to leader after 
metadata.max.age.ms
+    brokers(followerBrokerId).shutdown()
+    val topicPartition = new TopicPartition(topic, 0)
+    TestUtils.waitUntilTrue(() => {
+      val endpoints = 
brokers(leaderBrokerId).metadataCache.getPartitionReplicaEndpoints(topicPartition,
 listenerName)
+      !endpoints.contains(followerBrokerId)
+    }, "follower is still reachable.")
+
+    assertEquals(-1, getPreferredReplica)
+  }
+
+  @Test
+  def testFetchFromFollowerWithRoll(): Unit = {

Review Comment:
   This is the new unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to