This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new a954ad1c67c KAFKA-17331 Throw unsupported version exception if the
server does NOT support EarliestLocalSpec and LatestTieredSpec (#16873)
a954ad1c67c is described below
commit a954ad1c67ce213d40cb20c64b90b39e451645d9
Author: PoAn Yang <[email protected]>
AuthorDate: Sun Sep 1 21:13:40 2024 +0800
KAFKA-17331 Throw unsupported version exception if the server does NOT
support EarliestLocalSpec and LatestTieredSpec (#16873)
Add the version check to server side for the specific timestamp:
- the version must be >=8 if timestamp=-4L
- the version must be >=9 if timestamp=-5L
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 2 -
core/src/main/scala/kafka/server/KafkaApis.scala | 70 ++++++++++++++--------
.../scala/unit/kafka/server/KafkaApisTest.scala | 49 +++++++++++++++
.../unit/kafka/server/ListOffsetsRequestTest.scala | 19 ++++--
.../scala/unit/kafka/server/LogOffsetTest.scala | 18 +++---
5 files changed, 115 insertions(+), 43 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 092b8c92aa4..450f18b4ef6 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1400,8 +1400,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
startIndex = offsetTimeArray.length - 1
case ListOffsetsRequest.EARLIEST_TIMESTAMP =>
startIndex = 0
- case ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
- startIndex = 0
case _ =>
var isFound = false
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d,
%d".format(o._1, o._2)))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index cdd2d06ee2e..f68fec56951 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -84,7 +84,7 @@ import java.util.concurrent.{CompletableFuture,
ConcurrentHashMap}
import java.util.{Collections, Optional, OptionalInt}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Map, Seq, Set, mutable}
+import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
@@ -1111,35 +1111,43 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseTopics = authorizedRequestInfo.map { topic =>
val responsePartitions = topic.partitions.asScala.map { partition =>
- val topicPartition = new TopicPartition(topic.name,
partition.partitionIndex)
-
- try {
- val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
- topicPartition = topicPartition,
- timestamp = partition.timestamp,
- maxNumOffsets = partition.maxNumOffsets,
- isFromConsumer = offsetRequest.replicaId ==
ListOffsetsRequest.CONSUMER_REPLICA_ID,
- fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+ if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+ // Negative timestamps are reserved for some functions.
+ // For v0 requests, negative timestamps only support
LATEST_TIMESTAMP (-1) and EARLIEST_TIMESTAMP (-2).
new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
- .setErrorCode(Errors.NONE.code)
- .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
- } catch {
- // NOTE: UnknownTopicOrPartitionException and
NotLeaderOrFollowerException are special cases since these error messages
- // are typically transient and there is no value in logging the
entire stack trace for the same
- case e @ (_ : UnknownTopicOrPartitionException |
- _ : NotLeaderOrFollowerException |
- _ : KafkaStorageException) =>
- debug("Offset request with correlation id %d from client %s on
partition %s failed due to %s".format(
- correlationId, clientId, topicPartition, e.getMessage))
- new ListOffsetsPartitionResponse()
- .setPartitionIndex(partition.partitionIndex)
- .setErrorCode(Errors.forException(e).code)
- case e: Throwable =>
- error("Error while responding to offset request", e)
+ .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+ } else {
+ val topicPartition = new TopicPartition(topic.name,
partition.partitionIndex)
+
+ try {
+ val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+ topicPartition = topicPartition,
+ timestamp = partition.timestamp,
+ maxNumOffsets = partition.maxNumOffsets,
+ isFromConsumer = offsetRequest.replicaId ==
ListOffsetsRequest.CONSUMER_REPLICA_ID,
+ fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
new ListOffsetsPartitionResponse()
.setPartitionIndex(partition.partitionIndex)
- .setErrorCode(Errors.forException(e).code)
+ .setErrorCode(Errors.NONE.code)
+ .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
+ } catch {
+ // NOTE: UnknownTopicOrPartitionException and
NotLeaderOrFollowerException are special cases since these error messages
+ // are typically transient and there is no value in logging the
entire stack trace for the same
+ case e @ (_ : UnknownTopicOrPartitionException |
+ _ : NotLeaderOrFollowerException |
+ _ : KafkaStorageException) =>
+ debug("Offset request with correlation id %d from client %s on
partition %s failed due to %s".format(
+ correlationId, clientId, topicPartition, e.getMessage))
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(partition.partitionIndex)
+ .setErrorCode(Errors.forException(e).code)
+ case e: Throwable =>
+ error("Error while responding to offset request", e)
+ new ListOffsetsPartitionResponse()
+ .setPartitionIndex(partition.partitionIndex)
+ .setErrorCode(Errors.forException(e).code)
+ }
}
}
new
ListOffsetsTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
@@ -1152,6 +1160,13 @@ class KafkaApis(val requestChannel: RequestChannel,
val clientId = request.header.clientId
val offsetRequest = request.body[ListOffsetsRequest]
val version = request.header.apiVersion
+ val timestampMinSupportedVersion = immutable.Map[Long, Short](
+ ListOffsetsRequest.EARLIEST_TIMESTAMP -> 1.toShort,
+ ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
+ ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
+ ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
+ ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
+ )
def buildErrorResponse(e: Errors, partition: ListOffsetsPartition):
ListOffsetsPartitionResponse = {
new ListOffsetsPartitionResponse()
@@ -1178,6 +1193,9 @@ class KafkaApis(val requestChannel: RequestChannel,
debug(s"OffsetRequest with correlation id $correlationId from client
$clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
buildErrorResponse(Errors.INVALID_REQUEST, partition)
+ } else if (partition.timestamp() < 0 &&
+ (!timestampMinSupportedVersion.contains(partition.timestamp()) ||
version < timestampMinSupportedVersion(partition.timestamp()))) {
+ buildErrorResponse(Errors.UNSUPPORTED_VERSION, partition)
} else {
try {
val fetchOnlyFromLeader = offsetRequest.replicaId !=
ListOffsetsRequest.DEBUGGING_REPLICA_ID
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index be1afc34947..7487d05edee 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4039,6 +4039,31 @@ class KafkaApisTest extends Logging {
testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED)
}
+ @Test
+ def testListOffsetMaxTimestampWithUnsupportedVersion(): Unit = {
+
testConsumerListOffsetWithUnsupportedVersion(ListOffsetsRequest.MAX_TIMESTAMP,
6)
+ }
+
+ @Test
+ def testListOffsetEarliestLocalTimestampWithUnsupportedVersion(): Unit = {
+
testConsumerListOffsetWithUnsupportedVersion(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP,
7)
+ }
+
+ @Test
+ def testListOffsetLatestTieredTimestampWithUnsupportedVersion(): Unit = {
+
testConsumerListOffsetWithUnsupportedVersion(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP,
8)
+ }
+
+ @Test
+ def testListOffsetNegativeTimestampWithZeroVersion(): Unit = {
+ testConsumerListOffsetWithUnsupportedVersion(-3, 0)
+ }
+
+ @Test
+ def testListOffsetNegativeTimestampWithOneOrAboveVersion(): Unit = {
+ testConsumerListOffsetWithUnsupportedVersion(-6, 1)
+ }
+
/**
* Verifies that the metadata response is correct if the broker listeners
are inconsistent (i.e. one broker has
* more listeners than another) and the request is sent on the listener that
exists in both brokers.
@@ -6152,6 +6177,30 @@ class KafkaApisTest extends Logging {
verifyNoThrottling[MetadataResponse](requestChannelRequest)
}
+ private def testConsumerListOffsetWithUnsupportedVersion(timestamp: Long,
version: Short): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val targetTimes = List(new ListOffsetsTopic()
+ .setName(tp.topic)
+ .setPartitions(List(new ListOffsetsPartition()
+ .setPartitionIndex(tp.partition)
+ .setTimestamp(timestamp)).asJava)).asJava
+
+ val data = new
ListOffsetsRequestData().setTopics(targetTimes).setReplicaId(ListOffsetsRequest.CONSUMER_REPLICA_ID)
+ val listOffsetRequest =
ListOffsetsRequest.parse(MessageUtil.toByteBuffer(data, version), version)
+ val request = buildRequest(listOffsetRequest)
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handleListOffsetRequest(request)
+
+ val response = verifyNoThrottling[ListOffsetsResponse](request)
+ val partitionDataOptional = response.topics.asScala.find(_.name ==
tp.topic).get
+ .partitions.asScala.find(_.partitionIndex == tp.partition)
+ assertTrue(partitionDataOptional.isDefined)
+
+ val partitionData = partitionDataOptional.get
+ assertEquals(Errors.UNSUPPORTED_VERSION.code, partitionData.errorCode)
+ }
+
private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel):
Unit = {
val tp = new TopicPartition("foo", 0)
val latestOffset = 15L
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 03585be97c9..9c40efd29e4 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -258,24 +258,33 @@ class ListOffsetsRequestTest extends BaseRequestTest {
if (version == 0) {
assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
- assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+ assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()),
fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP,
version.toShort))
+ assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()),
fetchOffsetAndEpochWithError(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
} else if (version >= 1 && version <= 3) {
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
- assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+ assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()),
fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP,
version.toShort))
+ assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()),
fetchOffsetAndEpochWithError(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
} else if (version >= 4 && version <= 6) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
- assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
- } else if (version >= 7) {
+ assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()),
fetchOffsetAndEpochWithError(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP,
version.toShort))
+ assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()),
fetchOffsetAndEpochWithError(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
+ } else if (version == 7) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
- assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
+ assertEquals((-1L, -1, Errors.UNSUPPORTED_VERSION.code()),
fetchOffsetAndEpochWithError(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
+ } else if (version >= 8) {
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+ assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+ assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 177f59221a9..3bcdf2b2592 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -250,17 +250,15 @@ class LogOffsetTest extends BaseRequestTest {
log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), leaderEpoch = 0)
log.flush(false)
- for (timestamp <- Seq(ListOffsetsRequest.EARLIEST_TIMESTAMP,
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) {
- val offsets = log.legacyFetchOffsetsBefore(timestamp, 10)
- assertEquals(Seq(0L), offsets)
+ val offsets =
log.legacyFetchOffsetsBefore(ListOffsetsRequest.EARLIEST_TIMESTAMP, 10)
+ assertEquals(Seq(0L), offsets)
- TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic,
topicPartition.partition, broker),
- "Leader should be elected")
- val request = ListOffsetsRequest.Builder.forReplica(0, 0)
- .setTargetTimes(buildTargetTimes(topicPartition, timestamp,
10).asJava).build()
- val consumerOffsets =
findPartition(sendListOffsetsRequest(request).topics.asScala,
topicPartition).oldStyleOffsets.asScala
- assertEquals(Seq(0L), consumerOffsets)
- }
+ TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic,
topicPartition.partition, broker),
+ "Leader should be elected")
+ val request = ListOffsetsRequest.Builder.forReplica(0, 0)
+ .setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.EARLIEST_TIMESTAMP, 10).asJava).build()
+ val consumerOffsets =
findPartition(sendListOffsetsRequest(request).topics.asScala,
topicPartition).oldStyleOffsets.asScala
+ assertEquals(Seq(0L), consumerOffsets)
}
/* We test that `fetchOffsetsBefore` works correctly if `LogSegment.size`
changes after each invocation (simulating