This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new a954ad1c67c KAFKA-17331 Throw unsupported version exception if the 
server does NOT support EarliestLocalSpec and LatestTieredSpec (#16873)
a954ad1c67c is described below

commit a954ad1c67ce213d40cb20c64b90b39e451645d9
Author: PoAn Yang <[email protected]>
AuthorDate: Sun Sep 1 21:13:40 2024 +0800

    KAFKA-17331 Throw unsupported version exception if the server does NOT 
support EarliestLocalSpec and LatestTieredSpec (#16873)
    
    Add the version check to server side for the specific timestamp:
    - the version must be >=8 if timestamp=-4L
    - the version must be >=9 if timestamp=-5L
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  2 -
 core/src/main/scala/kafka/server/KafkaApis.scala   | 70 ++++++++++++++--------
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 49 +++++++++++++++
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 19 ++++--
 .../scala/unit/kafka/server/LogOffsetTest.scala    | 18 +++---
 5 files changed, 115 insertions(+), 43 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 092b8c92aa4..450f18b4ef6 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1400,8 +1400,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         startIndex = offsetTimeArray.length - 1
       case ListOffsetsRequest.EARLIEST_TIMESTAMP =>
         startIndex = 0
-      case ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
-        startIndex = 0
       case _ =>
         var isFound = false
         debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, 
%d".format(o._1, o._2)))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index cdd2d06ee2e..f68fec56951 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -84,7 +84,7 @@ import java.util.concurrent.{CompletableFuture, 
ConcurrentHashMap}
 import java.util.{Collections, Optional, OptionalInt}
 import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Map, Seq, Set, mutable}
+import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Success, Try}
 
@@ -1111,35 +1111,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseTopics = authorizedRequestInfo.map { topic =>
       val responsePartitions = topic.partitions.asScala.map { partition =>
-        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-        try {
-          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-            topicPartition = topicPartition,
-            timestamp = partition.timestamp,
-            maxNumOffsets = partition.maxNumOffsets,
-            isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+        if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+          // Negative timestamps are reserved for some functions.
+          // For v0 requests, negative timestamps only support 
LATEST_TIMESTAMP (-1) and EARLIEST_TIMESTAMP (-2).
           new ListOffsetsPartitionResponse()
             .setPartitionIndex(partition.partitionIndex)
-            .setErrorCode(Errors.NONE.code)
-            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
-        } catch {
-          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cases since these error messages
-          // are typically transient and there is no value in logging the 
entire stack trace for the same
-          case e @ (_ : UnknownTopicOrPartitionException |
-                    _ : NotLeaderOrFollowerException |
-                    _ : KafkaStorageException) =>
-            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-              correlationId, clientId, topicPartition, e.getMessage))
-            new ListOffsetsPartitionResponse()
-              .setPartitionIndex(partition.partitionIndex)
-              .setErrorCode(Errors.forException(e).code)
-          case e: Throwable =>
-            error("Error while responding to offset request", e)
+            .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+        } else {
+          val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+
+          try {
+            val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+              topicPartition = topicPartition,
+              timestamp = partition.timestamp,
+              maxNumOffsets = partition.maxNumOffsets,
+              isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
+              fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
             new ListOffsetsPartitionResponse()
               .setPartitionIndex(partition.partitionIndex)
-              .setErrorCode(Errors.forException(e).code)
+              .setErrorCode(Errors.NONE.code)
+              .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
+          } catch {
+            // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cases since these error messages
+            // are typically transient and there is no value in logging the 
entire stack trace for the same
+            case e @ (_ : UnknownTopicOrPartitionException |
+                      _ : NotLeaderOrFollowerException |
+                      _ : KafkaStorageException) =>
+              debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+                correlationId, clientId, topicPartition, e.getMessage))
+              new ListOffsetsPartitionResponse()
+                .setPartitionIndex(partition.partitionIndex)
+                .setErrorCode(Errors.forException(e).code)
+            case e: Throwable =>
+              error("Error while responding to offset request", e)
+              new ListOffsetsPartitionResponse()
+                .setPartitionIndex(partition.partitionIndex)
+                .setErrorCode(Errors.forException(e).code)
+          }
         }
       }
       new 
ListOffsetsTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
@@ -1152,6 +1160,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetsRequest]
     val version = request.header.apiVersion
+    val timestampMinSupportedVersion = immutable.Map[Long, Short](
+      ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort,
+      ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
+      ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
+      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
+      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
+    )
 
     def buildErrorResponse(e: Errors, partition: ListOffsetsPartition): 
ListOffsetsPartitionResponse = {
       new ListOffsetsPartitionResponse()
@@ -1178,6 +1193,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
               s"failed because the partition is duplicated in the request.")
           buildErrorResponse(Errors.INVALID_REQUEST, partition)
+        } else if (partition.timestamp() < 0 &&
+          (!timestampMinSupportedVersion.contains(partition.timestamp()) || 
version < timestampMinSupportedVersion(partition.timestamp()))) {
+          buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)
         } else {
           try {
             val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index be1afc34947..7487d05edee 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4039,6 +4039,31 @@ class KafkaApisTest extends Logging {
     testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED)
   }
 
+  @Test
+  def testListOffsetMaxTimestampWithUnsupportedVersion(): Unit = {
+    
testConsumerListOffsetWithUnsupportedVersion(ListOffsetsRequest.MAX_TIMESTAMP, 
6)
+  }
+
+  @Test
+  def testListOffsetEarliestLocalTimestampWithUnsupportedVersion(): Unit = {
+    
testConsumerListOffsetWithUnsupportedVersion(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP,
 7)
+  }
+
+  @Test
+  def testListOffsetLatestTieredTimestampWithUnsupportedVersion(): Unit = {
+    
testConsumerListOffsetWithUnsupportedVersion(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP,
 8)
+  }
+
+  @Test
+  def testListOffsetNegativeTimestampWithZeroVersion(): Unit = {
+    testConsumerListOffsetWithUnsupportedVersion(-3, 0)
+  }
+
+  @Test
+  def testListOffsetNegativeTimestampWithOneOrAboveVersion(): Unit = {
+    testConsumerListOffsetWithUnsupportedVersion(-6, 1)
+  }
+
   /**
    * Verifies that the metadata response is correct if the broker listeners 
are inconsistent (i.e. one broker has
    * more listeners than another) and the request is sent on the listener that 
exists in both brokers.
@@ -6152,6 +6177,30 @@ class KafkaApisTest extends Logging {
     verifyNoThrottling[MetadataResponse](requestChannelRequest)
   }
 
+  private def testConsumerListOffsetWithUnsupportedVersion(timestamp: Long, 
version: Short): Unit = {
+    val tp = new TopicPartition("foo", 0)
+    val targetTimes = List(new ListOffsetsTopic()
+      .setName(tp.topic)
+      .setPartitions(List(new ListOffsetsPartition()
+        .setPartitionIndex(tp.partition)
+        .setTimestamp(timestamp)).asJava)).asJava
+
+    val data = new 
ListOffsetsRequestData().setTopics(targetTimes).setReplicaId(ListOffsetsRequest.CONSUMER_REPLICA_ID)
+    val listOffsetRequest = 
ListOffsetsRequest.parse(MessageUtil.toByteBuffer(data, version), version)
+    val request = buildRequest(listOffsetRequest)
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handleListOffsetRequest(request)
+
+    val response = verifyNoThrottling[ListOffsetsResponse](request)
+    val partitionDataOptional = response.topics.asScala.find(_.name == 
tp.topic).get
+      .partitions.asScala.find(_.partitionIndex == tp.partition)
+    assertTrue(partitionDataOptional.isDefined)
+
+    val partitionData = partitionDataOptional.get
+    assertEquals(Errors.UNSUPPORTED_VERSION.code, partitionData.errorCode)
+  }
+
   private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): 
Unit = {
     val tp = new TopicPartition("foo", 0)
     val latestOffset = 15L
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 03585be97c9..9c40efd29e4 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -258,24 +258,33 @@ class ListOffsetsRequestTest extends BaseRequestTest {
       if (version == 0) {
         assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
-        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+        assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), 
fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, 
version.toShort))
+        assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), 
fetchOffsetAndEpochWithError(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
       } else if (version >= 1 && version <= 3) {
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
-        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+        assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), 
fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, 
version.toShort))
+        assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), 
fetchOffsetAndEpochWithError(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
       } else if (version >= 4 && version <= 6) {
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
-        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
-      } else if (version >= 7) {
+        assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), 
fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, 
version.toShort))
+        assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), 
fetchOffsetAndEpochWithError(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
+      } else if (version == 7) {
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
-        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
         assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
+        assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()), 
fetchOffsetAndEpochWithError(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
+      } else if (version >= 8) {
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+        assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 177f59221a9..3bcdf2b2592 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -250,17 +250,15 @@ class LogOffsetTest extends BaseRequestTest {
       log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush(false)
 
-    for (timestamp <- Seq(ListOffsetsRequest.EARLIEST_TIMESTAMP, 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) {
-      val offsets = log.legacyFetchOffsetsBefore(timestamp, 10)
-      assertEquals(Seq(0L), offsets)
+    val offsets = 
log.legacyFetchOffsetsBefore(ListOffsetsRequest.EARLIEST_TIMESTAMP, 10)
+    assertEquals(Seq(0L), offsets)
 
-      TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic, 
topicPartition.partition, broker),
-        "Leader should be elected")
-      val request = ListOffsetsRequest.Builder.forReplica(0, 0)
-        .setTargetTimes(buildTargetTimes(topicPartition, timestamp, 
10).asJava).build()
-      val consumerOffsets = 
findPartition(sendListOffsetsRequest(request).topics.asScala, 
topicPartition).oldStyleOffsets.asScala
-      assertEquals(Seq(0L), consumerOffsets)
-    }
+    TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic, 
topicPartition.partition, broker),
+      "Leader should be elected")
+    val request = ListOffsetsRequest.Builder.forReplica(0, 0)
+      .setTargetTimes(buildTargetTimes(topicPartition, 
ListOffsetsRequest.EARLIEST_TIMESTAMP, 10).asJava).build()
+    val consumerOffsets = 
findPartition(sendListOffsetsRequest(request).topics.asScala, 
topicPartition).oldStyleOffsets.asScala
+    assertEquals(Seq(0L), consumerOffsets)
   }
 
   /* We test that `fetchOffsetsBefore` works correctly if `LogSegment.size` 
changes after each invocation (simulating

Reply via email to