This is an automated email from the ASF dual-hosted git repository. kamalcph pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8d93d1096c2 KAFKA-17108: Add EarliestPendingUpload offset spec in ListOffsets API (#16584) 8d93d1096c2 is described below commit 8d93d1096c254cd98743cd51ccacb2dc6a815efc Author: Abhijeet Kumar <abhijeet.cse....@gmail.com> AuthorDate: Wed Aug 27 08:34:31 2025 +0530 KAFKA-17108: Add EarliestPendingUpload offset spec in ListOffsets API (#16584) This is the first part of the implementation of [KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset) The purpose of this pull request is for the broker to start returning the correct offset when it receives a -6 as a timestamp in a ListOffsets API request. Added unit tests for the new timestamp. Reviewers: Kamal Chandraprakash <kamal.chandraprak...@gmail.com> --- .../kafka/clients/admin/KafkaAdminClient.java | 2 + .../org/apache/kafka/clients/admin/OffsetSpec.java | 10 + .../admin/internals/ListOffsetsHandler.java | 7 +- .../kafka/common/requests/ListOffsetsRequest.java | 11 +- .../common/message/ListOffsetsRequest.json | 4 +- .../common/message/ListOffsetsResponse.json | 4 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 28 +++ .../common/requests/ListOffsetsRequestTest.java | 10 +- .../main/scala/kafka/server/ReplicaManager.scala | 3 +- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 227 ++++++++++++++++++++- .../kafka/server/common/MetadataVersion.java | 4 +- .../kafka/server/common/MetadataVersionTest.java | 4 +- .../kafka/storage/internals/log/UnifiedLog.java | 27 +++ .../org/apache/kafka/tools/GetOffsetShell.java | 8 +- .../kafka/tools/GetOffsetShellParsingTest.java | 2 +- .../org/apache/kafka/tools/GetOffsetShellTest.java | 24 +++ 16 files changed, 358 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 270a7124826..90f83eac935 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -5154,6 +5154,8 @@ public class KafkaAdminClient extends AdminClient { return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP; } else if (offsetSpec instanceof OffsetSpec.LatestTieredSpec) { return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP; + } else if (offsetSpec instanceof OffsetSpec.EarliestPendingUploadSpec) { + return ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP; } return ListOffsetsRequest.LATEST_TIMESTAMP; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java index 68f94cc493e..ad73c8d51f0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java @@ -28,6 +28,7 @@ public class OffsetSpec { public static class MaxTimestampSpec extends OffsetSpec { } public static class EarliestLocalSpec extends OffsetSpec { } public static class LatestTieredSpec extends OffsetSpec { } + public static class EarliestPendingUploadSpec extends OffsetSpec { } public static class TimestampSpec extends OffsetSpec { private final long timestamp; @@ -91,4 +92,13 @@ public class OffsetSpec { public static OffsetSpec latestTiered() { return new LatestTieredSpec(); } + + /** + * Used to retrieve the earliest offset of records that are pending upload to remote storage. + * <br/> + * Note: When tiered storage is not enabled, we will return unknown offset. + */ + public static OffsetSpec earliestPendingUpload() { + return new EarliestPendingUploadSpec(); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java index f7c495d7fd8..a46d6f24a7b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java @@ -103,12 +103,17 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset .stream() .anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP); + boolean requireEarliestPendingUploadTimestamp = keys + .stream() + .anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP); + int timeoutMs = options.timeoutMs() != null ? options.timeoutMs() : defaultApiTimeoutMs; return ListOffsetsRequest.Builder.forConsumer(true, options.isolationLevel(), supportsMaxTimestamp, requireEarliestLocalTimestamp, - requireTieredStorageTimestamp) + requireTieredStorageTimestamp, + requireEarliestPendingUploadTimestamp) .setTargetTimes(new ArrayList<>(topicsByName.values())) .setTimeoutMs(timeoutMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java index 7415412d050..5862ebdfafc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest { public static final long LATEST_TIERED_TIMESTAMP = -5L; + public static final long EARLIEST_PENDING_UPLOAD_TIMESTAMP = -6L; + public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; @@ -58,16 +60,19 @@ public class ListOffsetsRequest extends AbstractRequest { public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { - return forConsumer(requireTimestamp, isolationLevel, false, false, false); + return forConsumer(requireTimestamp, isolationLevel, false, false, false, false); } public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp, boolean requireEarliestLocalTimestamp, - boolean requireTieredStorageTimestamp) { + boolean requireTieredStorageTimestamp, + boolean requireEarliestPendingUploadTimestamp) { short minVersion = ApiKeys.LIST_OFFSETS.oldestVersion(); - if (requireTieredStorageTimestamp) + if (requireEarliestPendingUploadTimestamp) + minVersion = 11; + else if (requireTieredStorageTimestamp) minVersion = 9; else if (requireEarliestLocalTimestamp) minVersion = 8; diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json index 6f8ff7d6cf9..1a2de6ca30a 100644 --- a/clients/src/main/resources/common/message/ListOffsetsRequest.json +++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json @@ -40,7 +40,9 @@ // Version 9 enables listing offsets by last tiered offset (KIP-1005). // // Version 10 enables async remote list offsets support (KIP-1075) - "validVersions": "1-10", + // + // Version 11 enables listing offsets by earliest pending upload offset (KIP-1023) + "validVersions": "1-11", "flexibleVersions": "6+", "latestVersionUnstable": false, "fields": [ diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json index 7f9588847b9..1407273bf4d 100644 --- a/clients/src/main/resources/common/message/ListOffsetsResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json @@ -40,7 +40,9 @@ // Version 9 enables listing offsets by last tiered offset (KIP-1005). // // Version 10 enables async remote list offsets support (KIP-1075) - "validVersions": "1-10", + // + // Version 11 enables listing offsets by earliest pending upload offset (KIP-1023) + "validVersions": "1-11", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3e093c5029a..e7fa11177d3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -8730,6 +8730,34 @@ public class KafkaAdminClientTest { } } + @Test + public void testListOffsetsEarliestPendingUploadSpecSpecMinVersion() throws Exception { + Node node = new Node(0, "localhost", 8120); + List<Node> nodes = Collections.singletonList(node); + List<PartitionInfo> pInfos = new ArrayList<>(); + pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + pInfos, + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, + AdminClientConfig.RETRIES_CONFIG, "2")) { + + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.earliestPendingUpload())); + + TestUtils.waitForCondition(() -> env.kafkaClient().requests().stream().anyMatch(request -> + request.requestBuilder().apiKey().messageType == ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() == 11 + ), "no listOffsets request has the expected oldestAllowedVersion"); + } + } + private Map<String, FeatureUpdate> makeTestFeatureUpdates() { return Utils.mkMap( Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)), diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java index 2cf4cbc00c9..48542c1a2fd 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java @@ -127,13 +127,16 @@ public class ListOffsetsRequestTest { .forConsumer(false, IsolationLevel.READ_COMMITTED); ListOffsetsRequest.Builder maxTimestampRequestBuilder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false, false); + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false, false, false); ListOffsetsRequest.Builder requireEarliestLocalTimestampRequestBuilder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true, false); + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true, false, false); ListOffsetsRequest.Builder requireTieredStorageTimestampRequestBuilder = ListOffsetsRequest.Builder - .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false, true); + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false, true, false); + + ListOffsetsRequest.Builder requireEarliestPendingUploadTimestampRequestBuilder = ListOffsetsRequest.Builder + .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false, false, true); assertEquals((short) 1, consumerRequestBuilder.oldestAllowedVersion()); assertEquals((short) 1, requireTimestampRequestBuilder.oldestAllowedVersion()); @@ -141,5 +144,6 @@ public class ListOffsetsRequestTest { assertEquals((short) 7, maxTimestampRequestBuilder.oldestAllowedVersion()); assertEquals((short) 8, requireEarliestLocalTimestampRequestBuilder.oldestAllowedVersion()); assertEquals((short) 9, requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion()); + assertEquals((short) 11, requireEarliestPendingUploadTimestampRequestBuilder.oldestAllowedVersion()); } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 202590ca6f4..070b3e544a6 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -172,7 +172,8 @@ object ReplicaManager { ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort, ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort, - ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort + ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort, + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP -> 11.toShort ) def createLogReadResult(highWatermark: Long, diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index d30d5a1040e..da54113ae5c 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffs import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, Cleaner, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, AsyncOffsetReader, Cleaner, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.junit.jupiter.api.Assertions._ @@ -2416,6 +2416,193 @@ class UnifiedLogTest { KafkaConfig.fromProps(props) } + @Test + def testFetchEarliestPendingUploadTimestampNoRemoteStorage(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) + + // Test initial state before any records + assertFetchOffsetBySpecialTimestamp(log, None, new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)), + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) + + // Append records + val _ = prepareLogWithSequentialRecords(log, recordCount = 2) + + // Test state after records are appended + assertFetchOffsetBySpecialTimestamp(log, None, new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)), + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) + } + + @Test + def testFetchEarliestPendingUploadTimestampWithRemoteStorage(): Unit = { + val logStartOffset = 0 + val (remoteLogManager: RemoteLogManager, log: UnifiedLog, timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset) + + val (firstTimestamp, firstLeaderEpoch) = (timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch) + val (secondTimestamp, secondLeaderEpoch) = (timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch) + val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, timestampAndEpochs(2).leaderEpoch) + + doAnswer(ans => { + val timestamp = ans.getArgument(1).asInstanceOf[Long] + Optional.of(timestamp) + .filter(_ == timestampAndEpochs.head.timestamp) + .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, Optional.of(timestampAndEpochs.head.leaderEpoch))) + }).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache)) + + // Offset 0 (first timestamp) is in remote storage and deleted locally. Offset 1 (second timestamp) is in local storage. + log.updateLocalLogStartOffset(1) + log.updateHighestOffsetInRemoteStorage(0) + + // In the assertions below we test that offset 0 (first timestamp) is only in remote and offset 1 (second timestamp) is in local storage. + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)), + ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, Optional.of(thirdLeaderEpoch)), + ListOffsetsRequest.LATEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)), + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) + } + + @Test + def testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion(): Unit = { + val logStartOffset = 0 + val (remoteLogManager: RemoteLogManager, log: UnifiedLog, timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset) + + val (firstTimestamp, firstLeaderEpoch) = (timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch) + val (secondTimestamp, secondLeaderEpoch) = (timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch) + val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, timestampAndEpochs(2).leaderEpoch) + + // Offsets upto 1 are in remote storage + doAnswer(ans => { + val timestamp = ans.getArgument(1).asInstanceOf[Long] + Optional.of( + timestamp match { + case x if x == firstTimestamp => new TimestampAndOffset(x, 0L, Optional.of(firstLeaderEpoch)) + case x if x == secondTimestamp => new TimestampAndOffset(x, 1L, Optional.of(secondLeaderEpoch)) + case _ => null + } + ) + }).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache)) + + // Offsets 0, 1 (first and second timestamps) are in remote storage and not deleted locally. + log.updateLocalLogStartOffset(0) + log.updateHighestOffsetInRemoteStorage(1) + + // In the assertions below we test that offset 0 (first timestamp) and offset 1 (second timestamp) are on both remote and local storage + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch)), + ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, Optional.of(thirdLeaderEpoch)), + ListOffsetsRequest.LATEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(thirdLeaderEpoch)), + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) + } + + @Test + def testFetchEarliestPendingUploadTimestampNoSegmentsUploaded(): Unit = { + val logStartOffset = 0 + val (remoteLogManager: RemoteLogManager, log: UnifiedLog, timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset) + + val (firstTimestamp, firstLeaderEpoch) = (timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch) + val (secondTimestamp, secondLeaderEpoch) = (timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch) + val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, timestampAndEpochs(2).leaderEpoch) + + // No offsets are in remote storage + doAnswer(_ => Optional.empty[TimestampAndOffset]()) + .when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache)) + + // Offsets 0, 1, 2 (first, second and third timestamps) are in local storage only and not uploaded to remote storage. + log.updateLocalLogStartOffset(0) + log.updateHighestOffsetInRemoteStorage(-1) + + // In the assertions below we test that offset 0 (first timestamp), offset 1 (second timestamp) and offset 2 (third timestamp) are only on the local storage. + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), firstTimestamp) + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), secondTimestamp) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L, Optional.of(-1)), + ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, Optional.of(thirdLeaderEpoch)), + ListOffsetsRequest.LATEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) + } + + @Test + def testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote(): Unit = { + val logStartOffset = 100 + val (remoteLogManager: RemoteLogManager, log: UnifiedLog, timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset) + + val (firstTimestamp, firstLeaderEpoch) = (timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch) + val (secondTimestamp, secondLeaderEpoch) = (timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch) + val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, timestampAndEpochs(2).leaderEpoch) + + // Offsets 100, 101, 102 (first, second and third timestamps) are in local storage and not uploaded to remote storage. + // Tiered storage copy was disabled and then enabled again, because of which the remote log segments are deleted but + // the highest offset in remote storage has become stale + doAnswer(_ => Optional.empty[TimestampAndOffset]()) + .when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition), + anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache)) + + log.updateLocalLogStartOffset(100) + log.updateHighestOffsetInRemoteStorage(50) + + // In the assertions below we test that offset 100 (first timestamp), offset 101 (second timestamp) and offset 102 (third timestamp) are only on the local storage. + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(firstTimestamp, 100L, Optional.of(firstLeaderEpoch))), firstTimestamp) + assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new TimestampAndOffset(secondTimestamp, 101L, Optional.of(secondLeaderEpoch))), secondTimestamp) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L, Optional.empty()), + ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L, Optional.of(thirdLeaderEpoch)), + ListOffsetsRequest.LATEST_TIMESTAMP) + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, Optional.of(firstLeaderEpoch)), + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) + } + + private def prepare(logStartOffset: Int): (RemoteLogManager, UnifiedLog, Seq[TimestampAndEpoch]) = { + val config: KafkaConfig = createKafkaConfigWithRLM + val purgatory = new DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", config.brokerId) + val remoteLogManager = spy(new RemoteLogManager(config.remoteLogManagerConfig, + 0, + logDir.getAbsolutePath, + "clusterId", + mockTime, + _ => Optional.empty[UnifiedLog](), + (_, _) => {}, + brokerTopicStats, + new Metrics(), + Optional.empty)) + remoteLogManager.setDelayedOperationPurgatory(purgatory) + + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1, remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, logStartOffset = logStartOffset, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) + + // Verify earliest pending upload offset for empty log + assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, logStartOffset, Optional.empty()), + ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) + + val timestampAndEpochs = prepareLogWithSequentialRecords(log, recordCount = 3) + (remoteLogManager, log, timestampAndEpochs) + } + /** * Test the Log truncate operations */ @@ -4786,6 +4973,44 @@ class UnifiedLogTest { (log, segmentWithOverflow) } + + private def assertFetchOffsetByTimestamp(log: UnifiedLog, remoteLogManagerOpt: Option[RemoteLogManager], expected: Option[TimestampAndOffset], timestamp: Long): Unit = { + val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt) + val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader) + assertTrue(offsetResultHolder.futureHolderOpt.isPresent) + offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone) + assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset) + assertEquals(expected.get, offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null)) + } + + private def assertFetchOffsetBySpecialTimestamp(log: UnifiedLog, remoteLogManagerOpt: Option[RemoteLogManager], expected: TimestampAndOffset, timestamp: Long): Unit = { + val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt) + val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, remoteOffsetReader) + assertEquals(new OffsetResultHolder(expected), offsetResultHolder) + } + + private def getRemoteOffsetReader(remoteLogManagerOpt: Option[Any]): Optional[AsyncOffsetReader] = { + remoteLogManagerOpt match { + case Some(remoteLogManager) => Optional.of(remoteLogManager.asInstanceOf[AsyncOffsetReader]) + case None => Optional.empty[AsyncOffsetReader]() + } + } + + private def prepareLogWithSequentialRecords(log: UnifiedLog, recordCount: Int): Seq[TimestampAndEpoch] = { + val firstTimestamp = mockTime.milliseconds() + + (0 until recordCount).map { i => + val timestampAndEpoch = TimestampAndEpoch(firstTimestamp + i, i) + log.appendAsLeader( + TestUtils.singletonRecords(value = TestUtils.randomBytes(10), timestamp = timestampAndEpoch.timestamp), + timestampAndEpoch.leaderEpoch + ) + timestampAndEpoch + } + } + + case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int) } object UnifiedLogTest { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index dd7c5937bdc..ceca9a6a7de 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -283,7 +283,9 @@ public enum MetadataVersion { } public short listOffsetRequestVersion() { - if (this.isAtLeast(IBP_4_0_IV3)) { + if (this.isAtLeast(IBP_4_2_IV1)) { + return 11; + } else if (this.isAtLeast(IBP_4_0_IV3)) { return 10; } else if (this.isAtLeast(IBP_3_9_IV0)) { return 9; diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 508d4bd900b..49a200f6225 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -266,8 +266,8 @@ class MetadataVersionTest { @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testListOffsetsValueVersion(MetadataVersion metadataVersion) { - final short expectedVersion = 10; - if (metadataVersion.isAtLeast(IBP_4_0_IV3)) { + final short expectedVersion = 11; + if (metadataVersion.isAtLeast(IBP_4_2_IV1)) { assertEquals(expectedVersion, metadataVersion.listOffsetRequestVersion()); } else { assertTrue(metadataVersion.listOffsetRequestVersion() < expectedVersion); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index ca32e4f086a..769f59d56dc 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -1667,6 +1667,8 @@ public class UnifiedLog implements AutoCloseable { } else { return new OffsetResultHolder(new FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))); } + } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) { + return fetchEarliestPendingUploadOffset(remoteOffsetReader); } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. List<LogSegment> segments = logSegments(); @@ -1709,6 +1711,31 @@ public class UnifiedLog implements AutoCloseable { }); } + private OffsetResultHolder fetchEarliestPendingUploadOffset(Optional<AsyncOffsetReader> remoteOffsetReader) { + if (remoteLogEnabled()) { + long curHighestRemoteOffset = highestOffsetInRemoteStorage(); + + if (curHighestRemoteOffset == -1L) { + if (localLogStartOffset() == logStartOffset()) { + // No segments have been uploaded yet + return fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, remoteOffsetReader); + } else { + // Leader currently does not know about the already uploaded segments + return new OffsetResultHolder(Optional.of(new FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))); + } + } else { + long earliestPendingUploadOffset = Math.max(curHighestRemoteOffset + 1, logStartOffset()); + OptionalInt epochForOffset = leaderEpochCache.epochForOffset(earliestPendingUploadOffset); + Optional<Integer> epochResult = epochForOffset.isPresent() + ? Optional.of(epochForOffset.getAsInt()) + : Optional.empty(); + return new OffsetResultHolder(new FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, earliestPendingUploadOffset, epochResult)); + } + } else { + return new OffsetResultHolder(new FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))); + } + } + /** * Checks if the log is empty. * @return Returns True when the log is empty. Otherwise, false. diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java index 8cc9428afbd..ae16d11d8ed 100644 --- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java +++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java @@ -126,7 +126,7 @@ public class GetOffsetShell { .ofType(String.class); timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]") .withRequiredArg() - .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered") + .describedAs("<timestamp> / -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or earliest-pending-upload") .ofType(String.class) .defaultsTo("latest"); commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") @@ -276,6 +276,8 @@ public class GetOffsetShell { return OffsetSpec.earliestLocal(); case "latest-tiered": return OffsetSpec.latestTiered(); + case "earliest-pending-upload": + return OffsetSpec.earliestPendingUpload(); default: long timestamp; @@ -283,7 +285,7 @@ public class GetOffsetShell { timestamp = Long.parseLong(listOffsetsTimestamp); } catch (NumberFormatException e) { throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " + - "Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long format timestamp"); + "Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or earliest-pending-upload, or a specified long format timestamp"); } if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) { @@ -296,6 +298,8 @@ public class GetOffsetShell { return OffsetSpec.earliestLocal(); } else if (timestamp == ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) { return OffsetSpec.latestTiered(); + } else if (timestamp == ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) { + return OffsetSpec.earliestPendingUpload(); } else { return OffsetSpec.forTimestamp(timestamp); } diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java index 53c1c4d79c9..db53695a7be 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java @@ -247,7 +247,7 @@ public class GetOffsetShellParsingTest { @Test public void testInvalidOffset() { assertEquals("Malformed time argument foo. " + - "Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long format timestamp", + "Please use -1 or latest / -2 or earliest / -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or earliest-pending-upload, or a specified long format timestamp", assertThrows(TerseException.class, () -> GetOffsetShell.parseOffsetSpec("foo")).getMessage()); } diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java index 9986daa7f3b..c1c7b27639a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java @@ -367,6 +367,30 @@ public class GetOffsetShellTest { } } + @ClusterTemplate("withRemoteStorage") + public void testGetOffsetsByEarliestTieredSpec() throws InterruptedException { + setUp(); + setUpRemoteLogTopics(); + + for (String time : new String[] {"-6", "earliest-pending-upload"}) { + // test topics disable remote log storage + // as remote log disabled, broker returns unknown offset of each topic partition and these + // unknown offsets are ignore by GetOffsetShell, hence we have empty result here. + assertEquals(List.of(), + executeAndParse("--topic-partitions", "topic\\d+:0", "--time", time)); + + // test topics enable remote log storage + TestUtils.waitForCondition(() -> + List.of( + new Row("topicRLS1", 0, 0L), + new Row("topicRLS2", 0, 1L), + new Row("topicRLS3", 0, 2L), + new Row("topicRLS4", 0, 3L)) + .equals(executeAndParse("--topic-partitions", "topicRLS.*:0", "--time", time)), + "testGetOffsetsByEarliestTieredSpec result not match"); + } + } + @ClusterTest public void testGetOffsetsByTimestamp() { setUp();