dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1132627550


##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -302,6 +320,19 @@ public String toString() {
         }
     }
 
+    // Downgrades the ReplicaState field to be compatible with lower version.
+    public static void maybeDownGradeReplicaState(FetchRequestData 
fetchRequestData, short version) {
+        if (version < 15) {
+            
fetchRequestData.setReplicaId(fetchRequestData.replicaState().replicaId());
+            fetchRequestData.setReplicaState(new ReplicaState());
+        }
+    }
+
+    public static int replicaId(FetchRequestData fetchRequestData) {
+        return fetchRequestData.replicaId() != -1 ?
+                fetchRequestData.replicaId() : 
fetchRequestData.replicaState().replicaId();

Review Comment:
   nit: Should we keep it on a single line?



##########
clients/src/test/java/org/apache/kafka/common/requests/FetchRequestTest.java:
##########
@@ -198,6 +202,24 @@ public void testForgottenTopics(short version) {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("fetchVersions")

Review Comment:
   nit: You can use @ApiKeysVersionSource here.



##########
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##########
@@ -105,14 +108,18 @@ class KafkaNetworkChannel(
   private val correlationIdCounter = new AtomicInteger(0)
   private val endpoints = mutable.HashMap.empty[Int, Node]
 
-  private val requestThread = new RaftSendThread(
+  private var requestThread = new RaftSendThread(
     name = threadNamePrefix + "-outbound-request-thread",
     networkClient = client,
     requestTimeoutMs = requestTimeoutMs,
     time = time,
     isInterruptible = false
   )
 
+  def setRequestThread(raftSendThread: RaftSendThread): Unit = {
+    requestThread = raftSendThread
+  }

Review Comment:
   Hum.. I would rather prefer to avoid doing this. In the test, it seems that 
you could rely on the mocked client instead of doing this. Could you please 
give it a try?



##########
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala:
##########
@@ -58,7 +58,7 @@ class RemoteLeaderEndPointTest {
         blockingSend = new MockBlockingSender(offsets = new 
util.HashMap[TopicPartition, EpochEndOffset](),
             sourceBroker = sourceBroker, time = time)
         endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend, 
fetchSessionHandler,
-            config, replicaManager, QuotaFactory.UnboundedQuota, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION)
+            config, replicaManager, QuotaFactory.UnboundedQuota, () => 
MetadataVersion.MINIMUM_KRAFT_VERSION, () => 1)

Review Comment:
   nit: Should we add a small unit test in this file to ensure that the value 
is propagated correctly?



##########
core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala:
##########
@@ -159,6 +167,29 @@ class KafkaNetworkChannelTest {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("fetchVersions"))
+  def testFetchRequestDowngrade(version: Short): Unit = {
+    val destinationId = 2
+    val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+    channel.updateEndpoint(destinationId, new InetAddressSpec(
+      new InetSocketAddress(destinationNode.host, destinationNode.port)))
+    val mockSendThread = mock(classOf[RaftSendThread])
+    channel.setRequestThread(mockSendThread)

Review Comment:
   As said earlier, I would rather prefer to avoid this.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3366,8 +3366,10 @@ class KafkaApisTest {
     when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
-    val fetchRequest = new FetchRequest.Builder(9, 9, -1, 100, 0, 
fetchDataBuilder)
+    val fetchRequest = new FetchRequest.Builder(9, 9, -1, -1, 100, 0, 
fetchDataBuilder)
       .build()
+    assertEquals(fetchRequest.replicaEpoch(), -1)
+    assertEquals(fetchRequest.replicaId(), -1)

Review Comment:
   Those assertions are not really useful here because they don't test what the 
KafkaApis layer does. Let's remove them.



##########
core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala:
##########
@@ -159,6 +167,29 @@ class KafkaNetworkChannelTest {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("fetchVersions"))

Review Comment:
   nit: You can use `@ApiKeyVersionsSource` as well.



##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##########
@@ -208,6 +210,7 @@ class ReplicaFetcherThreadTest {
 
     //Assert that truncate to is called exactly once (despite two loops)
     verify(partition, times(3)).truncateTo(anyLong(), anyBoolean())
+    assertTrue(mockBrokerEpochSupplier.getCounter() > 0)

Review Comment:
   It is really weird to add those assertions in random tests. They don't bring 
much value so I would remove them.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -1272,6 +1274,7 @@ class ReplicaManagerTest {
       }
 
       verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(tp), 
any())
+      assertTrue(mockBrokerEpochSupplier.getCounter() > 0)

Review Comment:
   This one does not bring much value either. I would remove it.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1438,28 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    @Test
+    public void testFetchRequestVersionHandling() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // Latest fetch request.
+        FetchRequestData fetchRequestData = context.fetchRequest(
+                epoch, context.clusterId.toString(), otherNodeId, 0L, 0, 0);
+        context.deliverRequest(fetchRequestData);
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
+
+        FetchRequest.maybeDownGradeReplicaState(fetchRequestData, (short) 14);

Review Comment:
   nit: Could we add a comment here to be consistent with the previous block?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -958,7 +959,10 @@ private CompletableFuture<FetchResponseData> 
handleFetchRequest(
                 Errors.INVALID_REQUEST, Optional.empty()));
         }
 
-        FetchResponseData response = 
tryCompleteFetchRequest(request.replicaId(), fetchPartition, currentTimeMs);
+        FetchResponseData response = tryCompleteFetchRequest(
+            FetchRequest.replicaId(request),

Review Comment:
   There are other usages of `request.replicaId()` is this file. Could we 
extend unit tests to cover them all?



##########
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##########
@@ -44,7 +44,10 @@ object KafkaNetworkChannel {
       case fetchRequest: FetchRequestData =>
         // Since we already have the request, we go through a simplified 
builder
         new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
-          override def build(version: Short): FetchRequest = new 
FetchRequest(fetchRequest, version)
+          override def build(version: Short): FetchRequest = {
+            FetchRequest.maybeDownGradeReplicaState(fetchRequest, version)
+            new FetchRequest(fetchRequest, version)
+          }

Review Comment:
   As we have logic in here now, I wonder if defining a `SimpleBuilder` in the 
`FetchRequest` class would make sense. This would allow use to write unit test 
for it as well. What do you think?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -108,13 +108,15 @@ object PartitionTest {
 
   def followerFetchParams(
     replicaId: Int,
+    brokerEpoch: Long = 1L,

Review Comment:
   nit: `replicaEpoch`?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4527,8 +4532,11 @@ class KafkaApisTest {
 
     val fetchDataBuilder = Collections.singletonMap(tp0, new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, 
Optional.of(leaderEpoch)))
     val fetchData = Collections.singletonMap(tidp0, new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, Int.MaxValue, 
Optional.of(leaderEpoch)))
-    val fetchFromFollower = buildRequest(new FetchRequest.Builder(
-      ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000, 
0, fetchDataBuilder).build())
+    val fetchRequest = new FetchRequest.Builder(
+      ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1, 
1000, 0, fetchDataBuilder).build()
+    assertEquals(fetchRequest.replicaId(), 1)
+    assertEquals(fetchRequest.replicaEpoch(), 1)

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3421,8 +3423,11 @@ class KafkaApisTest {
       any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
 
     // If replicaId is -1 we will build a consumer request. Any non-negative 
replicaId will build a follower request.
+    val replicaEpoch = if (replicaId < 0) -1 else 1
     val fetchRequest = new FetchRequest.Builder(ApiKeys.FETCH.latestVersion, 
ApiKeys.FETCH.latestVersion,
-      replicaId, 100, 0, fetchDataBuilder).metadata(fetchMetadata).build()
+      replicaId, replicaEpoch, 100, 0, 
fetchDataBuilder).metadata(fetchMetadata).build()
+    assertEquals(fetchRequest.replicaEpoch(), replicaEpoch)
+    assertEquals(fetchRequest.replicaId(), replicaId)

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##########
@@ -1103,14 +1136,18 @@ class ReplicaFetcherThreadTest {
     assertProcessPartitionDataWhen(isReassigning = false)
   }
 
-  @Test
-  def testBuildFetch(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testBuildFetch(usingReplicaState: Boolean): Unit = {

Review Comment:
   Instead of hijacking this test, could we add a new and simpler unit test 
focused on what we want to test? Note that you could use `@EnumSource(value = 
MetadataVersion.class, names = {"IBP_3_0_IV1", "IBP_3_3_IV2"})` (with the 
relevant IBPs).



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1438,28 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, 
epoch, OptionalInt.of(localId));
     }
 
+    @Test
+    public void testFetchRequestVersionHandling() throws Exception {
+        int localId = 0;
+        int otherNodeId = 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+        // Latest fetch request.

Review Comment:
   nit: `Latest version of the fetch request`?



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2098,7 +2101,8 @@ class ReplicaManagerTest {
                                                  offsetFromLeader: Long = 5,
                                                  leaderEpochFromLeader: Int = 
3,
                                                  extraProps: Properties = new 
Properties(),
-                                                 topicId: Option[Uuid] = 
None): (ReplicaManager, LogManager) = {
+                                                 topicId: Option[Uuid] = None,
+                                                 brokerEpochSupplier: () => 
Long = null): (ReplicaManager, LogManager) = {

Review Comment:
   Is `null` expected here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to