showuon commented on code in PR #12674:
URL: https://github.com/apache/kafka/pull/12674#discussion_r978213955
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -949,14 +947,17 @@ class KafkaApis(val requestChannel: RequestChannel,
val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes,
config.fetchMaxBytes), maxQuotaWindowBytes)
val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
- val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
- // Fetch API version 11 added preferred replica logic
+ val clientMetadata = if (versionId >= 11 &&
replicaManager.hasReplicaSelector) {
+ // Fetching from follower is only supported from Fetch API version 11.
Moreover, we
+ // only allow it if the broker has a replica selector configured. If
it does not, there
Review Comment:
nit: there [is]
##########
core/src/test/scala/unit/kafka/server/FetchRequestTest.scala:
##########
@@ -285,8 +315,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
// Check follower error codes
val followerId = TestUtils.findFollowerId(topicPartition, servers)
- assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
- assertResponseErrorForEpoch(Errors.NONE, followerId,
Optional.of(secondLeaderEpoch))
+ assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId,
Optional.empty())
+ assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId,
Optional.of(secondLeaderEpoch))
Review Comment:
I'm afraid this will cause some confusion or possibly hide some potential
bug.
I'm thinking we can override the beforeEach to create a test broker with
replica selector config for this test to avoid `NOT_LEADER_OR_FOLLOWER` error.
ex:
```java
class FetchRequestTest extends BaseFetchRequestTest {
var shouldCreateReplicaSelector = false
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
if (testInfo.getDisplayName().equals("testCurrentEpochValidation")) {
shouldCreateReplicaSelector = true
}
super.setUp(testInfo)
}
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(1, zkConnect).map( p => {
if (shouldCreateReplicaSelector)
p.setProperty("replica.selector.class", "mock.class.name")
KafkaConfig.fromProps(p)
})
}
```
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]