This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new edc2c4f  KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set 
MaxNumOffsets field default to 1
edc2c4f is described below

commit edc2c4fd8d1e91237b08c8df70628e74637e4e47
Author: Manikumar Reddy <[email protected]>
AuthorDate: Mon Nov 2 23:39:03 2020 +0530

    KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets 
field default to 1
    
    Couple of failures observed after KAFKA-9627: Replace ListOffset 
request/response with automated protocol 
(https://github.com/apache/kafka/pull/8295)
    
    1. Latest consumer fails to consume from 0.10.0.1 brokers. Below system 
tests are failing
    
kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest
    
kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest
    
    Solution: Current default value for MaxNumOffsets is 0. because to this 
brokers are not returning offsets for v0 request. Set default value for 
MaxNumOffsets field to 1.  This is similar to previous [approach]
    
(https://github.com/apache/kafka/blob/2.6/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java#L204)
    
    2. In some scenarios, latest consumer fails with below error when 
connecting to a Kafka cluster which consists of newer and older (<=2.0) Kafka 
brokers
    `org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
write a non-default currentLeaderEpoch at version 3`
    
    Solution: After #8295, consumer can set non-default CurrentLeaderEpoch 
value for v3 and below requests. One solution is to make CurrentLeaderEpoch 
ignorable.
    
    Author: Manikumar Reddy <[email protected]>
    
    Reviewers: David Jacot <[email protected]>
    
    Closes #9540 from omkreddy/fix-listoffsets
    
    (cherry picked from commit 236d7dc890e82c9b146579a8be801c1c7f54feb9)
    Signed-off-by: Manikumar Reddy <[email protected]>
---
 .../common/message/ListOffsetRequest.json          |  4 +--
 .../kafka/common/requests/RequestResponseTest.java | 13 +++++-----
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 30 +++++++++++++++++-----
 3 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/resources/common/message/ListOffsetRequest.json 
b/clients/src/main/resources/common/message/ListOffsetRequest.json
index 259d7bf..5ecc2d6 100644
--- a/clients/src/main/resources/common/message/ListOffsetRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetRequest.json
@@ -42,11 +42,11 @@
         "about": "Each partition in the request.", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index." },
-        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", 
"default": "-1",
+        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", 
"default": "-1", "ignorable": true,
           "about": "The current leader epoch." },
         { "name": "Timestamp", "type": "int64", "versions": "0+",
           "about": "The current timestamp." },
-        { "name": "MaxNumOffsets", "type": "int32", "versions": "0",
+        { "name": "MaxNumOffsets", "type": "int32", "versions": "0", 
"default": "1",
           "about": "The maximum number of offsets to report." }
       ]}
     ]}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0862e2b..71048d8 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1243,7 +1243,8 @@ public class RequestResponseTest {
                     .setPartitions(Arrays.asList(new ListOffsetPartition()
                             .setPartitionIndex(0)
                             .setTimestamp(1000000L)
-                            .setMaxNumOffsets(10)));
+                            .setMaxNumOffsets(10)
+                            .setCurrentLeaderEpoch(5)));
             return ListOffsetRequest.Builder
                     .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
                     .setTargetTimes(Collections.singletonList(topic))
@@ -1253,7 +1254,8 @@ public class RequestResponseTest {
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)));
+                            .setTimestamp(1000000L)
+                            .setCurrentLeaderEpoch(5)));
             return ListOffsetRequest.Builder
                     .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
                     .setTargetTimes(Collections.singletonList(topic))
@@ -1261,10 +1263,9 @@ public class RequestResponseTest {
         } else if (version >= 2 && version <= 5) {
             ListOffsetPartition partition = new ListOffsetPartition()
                     .setPartitionIndex(0)
-                    .setTimestamp(1000000L);
-            if (version >= 4) {
-                partition.setCurrentLeaderEpoch(5);
-            }
+                    .setTimestamp(1000000L)
+                    .setCurrentLeaderEpoch(5);
+
             ListOffsetTopic topic = new ListOffsetTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(partition));
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index cedbf0a..ce324c7 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -143,7 +143,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionData = response.topics.asScala.find(_.name == topic).get
       .partitions.asScala.find(_.partitionIndex == partition.partition).get
 
-    (partitionData.offset, partitionData.leaderEpoch)
+    if (version == 0) {
+      if (partitionData.oldStyleOffsets().isEmpty)
+        (-1, partitionData.leaderEpoch)
+      else
+        (partitionData.oldStyleOffsets().asScala.head, 
partitionData.leaderEpoch)
+    } else
+      (partitionData.offset, partitionData.leaderEpoch)
   }
 
   @Test
@@ -174,17 +180,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   }
 
   @Test
-  def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = {
+  def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
     TestUtils.generateAndProduceMessages(servers, topic, 10)
 
-    assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
-    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
+    for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to 
ApiKeys.LIST_OFFSETS.latestVersion) {
+      if (version == 0) {
+        assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+      } else if (version >= 1 && version <= 3) {
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+      } else if (version >= 4) {
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+      }
+    }
   }
 
   private def assertResponseError(error: Errors, brokerId: Int, request: 
ListOffsetRequest): Unit = {

Reply via email to