showuon commented on code in PR #12674:
URL: https://github.com/apache/kafka/pull/12674#discussion_r978213955


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -949,14 +947,17 @@ class KafkaApis(val requestChannel: RequestChannel,
       val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, 
config.fetchMaxBytes), maxQuotaWindowBytes)
       val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
 
-      val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
-        // Fetch API version 11 added preferred replica logic
+      val clientMetadata = if (versionId >= 11 && 
replicaManager.hasReplicaSelector) {
+        // Fetching from follower is only supported from Fetch API version 11. 
Moreover, we
+        // only allow it if the broker has a replica selector configured. If 
it does not, there

Review Comment:
   nit: there [is]



##########
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala:
##########
@@ -285,8 +315,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
 
     // Check follower error codes
     val followerId = TestUtils.findFollowerId(topicPartition, servers)
-    assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
-    assertResponseErrorForEpoch(Errors.NONE, followerId, 
Optional.of(secondLeaderEpoch))
+    assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, 
Optional.empty())
+    assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, 
Optional.of(secondLeaderEpoch))

Review Comment:
   I'm afraid this will cause some confusion or possibly hide some potential 
bug.
   I'm thinking we can override the beforeEach to create a test broker with 
replica selector config for this test to avoid `NOT_LEADER_OR_FOLLOWER` error. 
ex:
   
   ```java
   class FetchRequestTest extends BaseFetchRequestTest {
     var shouldCreateReplicaSelector = false
   
     @BeforeEach
     override def setUp(testInfo: TestInfo): Unit = {
       if (testInfo.getDisplayName().equals("testCurrentEpochValidation")) {
         shouldCreateReplicaSelector = true
       }
       super.setUp(testInfo)
     }
   
     override def generateConfigs: Seq[KafkaConfig] = {
       TestUtils.createBrokerConfigs(1, zkConnect).map( p => {
         if (shouldCreateReplicaSelector)
           p.setProperty("replica.selector.class", "mock.class.name")
         KafkaConfig.fromProps(p)
       })
     }
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to