This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d93d1096c2 KAFKA-17108: Add EarliestPendingUpload offset spec in 
ListOffsets API (#16584)
8d93d1096c2 is described below

commit 8d93d1096c254cd98743cd51ccacb2dc6a815efc
Author: Abhijeet Kumar <abhijeet.cse....@gmail.com>
AuthorDate: Wed Aug 27 08:34:31 2025 +0530

    KAFKA-17108: Add EarliestPendingUpload offset spec in ListOffsets API 
(#16584)
    
    This is the first part of the implementation of
    
    
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
    
    The purpose of this pull request is for the broker to start returning
    the correct offset when it receives a -6 as a timestamp in a ListOffsets
    API request.
    
    Added unit tests for the new timestamp.
    
    Reviewers: Kamal Chandraprakash <kamal.chandraprak...@gmail.com>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   2 +
 .../org/apache/kafka/clients/admin/OffsetSpec.java |  10 +
 .../admin/internals/ListOffsetsHandler.java        |   7 +-
 .../kafka/common/requests/ListOffsetsRequest.java  |  11 +-
 .../common/message/ListOffsetsRequest.json         |   4 +-
 .../common/message/ListOffsetsResponse.json        |   4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  28 +++
 .../common/requests/ListOffsetsRequestTest.java    |  10 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   3 +-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 227 ++++++++++++++++++++-
 .../kafka/server/common/MetadataVersion.java       |   4 +-
 .../kafka/server/common/MetadataVersionTest.java   |   4 +-
 .../kafka/storage/internals/log/UnifiedLog.java    |  27 +++
 .../org/apache/kafka/tools/GetOffsetShell.java     |   8 +-
 .../kafka/tools/GetOffsetShellParsingTest.java     |   2 +-
 .../org/apache/kafka/tools/GetOffsetShellTest.java |  24 +++
 16 files changed, 358 insertions(+), 17 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 270a7124826..90f83eac935 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -5154,6 +5154,8 @@ public class KafkaAdminClient extends AdminClient {
             return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
         } else if (offsetSpec instanceof OffsetSpec.LatestTieredSpec) {
             return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
+        } else if (offsetSpec instanceof OffsetSpec.EarliestPendingUploadSpec) 
{
+            return ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP;
         }
         return ListOffsetsRequest.LATEST_TIMESTAMP;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index 68f94cc493e..ad73c8d51f0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -28,6 +28,7 @@ public class OffsetSpec {
     public static class MaxTimestampSpec extends OffsetSpec { }
     public static class EarliestLocalSpec extends OffsetSpec { }
     public static class LatestTieredSpec extends OffsetSpec { }
+    public static class EarliestPendingUploadSpec extends OffsetSpec { }
     public static class TimestampSpec extends OffsetSpec {
         private final long timestamp;
 
@@ -91,4 +92,13 @@ public class OffsetSpec {
     public static OffsetSpec latestTiered() {
         return new LatestTieredSpec();
     }
+
+    /**
+     * Used to retrieve the earliest offset of records that are pending upload 
to remote storage.
+     * <br/>
+     * Note: When tiered storage is not enabled, we will return unknown offset.
+     */
+    public static OffsetSpec earliestPendingUpload() {
+        return new EarliestPendingUploadSpec();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index f7c495d7fd8..a46d6f24a7b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -103,12 +103,17 @@ public final class ListOffsetsHandler extends 
Batched<TopicPartition, ListOffset
             .stream()
             .anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP);
 
+        boolean requireEarliestPendingUploadTimestamp = keys
+            .stream()
+            .anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP);
+
         int timeoutMs = options.timeoutMs() != null ? options.timeoutMs() : 
defaultApiTimeoutMs;
         return ListOffsetsRequest.Builder.forConsumer(true,
                         options.isolationLevel(),
                         supportsMaxTimestamp,
                         requireEarliestLocalTimestamp,
-                        requireTieredStorageTimestamp)
+                        requireTieredStorageTimestamp,
+                        requireEarliestPendingUploadTimestamp)
                 .setTargetTimes(new ArrayList<>(topicsByName.values()))
                 .setTimeoutMs(timeoutMs);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
index 7415412d050..5862ebdfafc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
@@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest {
 
     public static final long LATEST_TIERED_TIMESTAMP = -5L;
 
+    public static final long EARLIEST_PENDING_UPLOAD_TIMESTAMP = -6L;
+
     public static final int CONSUMER_REPLICA_ID = -1;
     public static final int DEBUGGING_REPLICA_ID = -2;
 
@@ -58,16 +60,19 @@ public class ListOffsetsRequest extends AbstractRequest {
 
         public static Builder forConsumer(boolean requireTimestamp,
                                           IsolationLevel isolationLevel) {
-            return forConsumer(requireTimestamp, isolationLevel, false, false, 
false);
+            return forConsumer(requireTimestamp, isolationLevel, false, false, 
false, false);
         }
 
         public static Builder forConsumer(boolean requireTimestamp,
                                           IsolationLevel isolationLevel,
                                           boolean requireMaxTimestamp,
                                           boolean 
requireEarliestLocalTimestamp,
-                                          boolean 
requireTieredStorageTimestamp) {
+                                          boolean 
requireTieredStorageTimestamp,
+                                          boolean 
requireEarliestPendingUploadTimestamp) {
             short minVersion = ApiKeys.LIST_OFFSETS.oldestVersion();
-            if (requireTieredStorageTimestamp)
+            if (requireEarliestPendingUploadTimestamp)
+                minVersion = 11;
+            else if (requireTieredStorageTimestamp)
                 minVersion = 9;
             else if (requireEarliestLocalTimestamp)
                 minVersion = 8;
diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json 
b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index 6f8ff7d6cf9..1a2de6ca30a 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -40,7 +40,9 @@
   // Version 9 enables listing offsets by last tiered offset (KIP-1005).
   //
   // Version 10 enables async remote list offsets support (KIP-1075)
-  "validVersions": "1-10",
+  //
+  // Version 11 enables listing offsets by earliest pending upload offset 
(KIP-1023)
+  "validVersions": "1-11",
   "flexibleVersions": "6+",
   "latestVersionUnstable": false,
   "fields": [
diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json 
b/clients/src/main/resources/common/message/ListOffsetsResponse.json
index 7f9588847b9..1407273bf4d 100644
--- a/clients/src/main/resources/common/message/ListOffsetsResponse.json
+++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json
@@ -40,7 +40,9 @@
   // Version 9 enables listing offsets by last tiered offset (KIP-1005).
   //
   // Version 10 enables async remote list offsets support (KIP-1075)
-  "validVersions": "1-10",
+  //
+  // Version 11 enables listing offsets by earliest pending upload offset 
(KIP-1023)
+  "validVersions": "1-11",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", 
"ignorable": true,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3e093c5029a..e7fa11177d3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -8730,6 +8730,34 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListOffsetsEarliestPendingUploadSpecSpecMinVersion() 
throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.earliestPendingUpload()));
+
+            TestUtils.waitForCondition(() -> 
env.kafkaClient().requests().stream().anyMatch(request ->
+                request.requestBuilder().apiKey().messageType == 
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() 
== 11
+            ), "no listOffsets request has the expected oldestAllowedVersion");
+        }
+    }
+
     private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
         return Utils.mkMap(
             Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,  
FeatureUpdate.UpgradeType.UPGRADE)),
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
index 2cf4cbc00c9..48542c1a2fd 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
@@ -127,13 +127,16 @@ public class ListOffsetsRequestTest {
             .forConsumer(false, IsolationLevel.READ_COMMITTED);
 
         ListOffsetsRequest.Builder maxTimestampRequestBuilder = 
ListOffsetsRequest.Builder
-            .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false, 
false);
+            .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true, false, 
false, false);
 
         ListOffsetsRequest.Builder requireEarliestLocalTimestampRequestBuilder 
= ListOffsetsRequest.Builder
-            .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true, 
false);
+            .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, true, 
false, false);
 
         ListOffsetsRequest.Builder requireTieredStorageTimestampRequestBuilder 
= ListOffsetsRequest.Builder
-            .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false, 
true);
+            .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false, 
true, false);
+
+        ListOffsetsRequest.Builder 
requireEarliestPendingUploadTimestampRequestBuilder = ListOffsetsRequest.Builder
+            .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false, false, 
false, true);
 
         assertEquals((short) 1, consumerRequestBuilder.oldestAllowedVersion());
         assertEquals((short) 1, 
requireTimestampRequestBuilder.oldestAllowedVersion());
@@ -141,5 +144,6 @@ public class ListOffsetsRequestTest {
         assertEquals((short) 7, 
maxTimestampRequestBuilder.oldestAllowedVersion());
         assertEquals((short) 8, 
requireEarliestLocalTimestampRequestBuilder.oldestAllowedVersion());
         assertEquals((short) 9, 
requireTieredStorageTimestampRequestBuilder.oldestAllowedVersion());
+        assertEquals((short) 11, 
requireEarliestPendingUploadTimestampRequestBuilder.oldestAllowedVersion());
     }
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 202590ca6f4..070b3e544a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -172,7 +172,8 @@ object ReplicaManager {
     ListOffsetsRequest.LATEST_TIMESTAMP -> 1.toShort,
     ListOffsetsRequest.MAX_TIMESTAMP -> 7.toShort,
     ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP -> 8.toShort,
-    ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort
+    ListOffsetsRequest.LATEST_TIERED_TIMESTAMP -> 9.toShort,
+    ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP -> 11.toShort
   )
 
   def createLogReadResult(highWatermark: Long,
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index d30d5a1040e..da54113ae5c 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
UnexpectedAppendOffs
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
 import 
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, 
PartitionMetadataFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
Cleaner, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
RecordValidationException, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
AsyncOffsetReader, Cleaner, EpochEntry, LogConfig, LogFileUtils, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, 
LogSegments, LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder, 
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, 
RecordValidationException, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, 
BrokerTopicStats}
 import org.junit.jupiter.api.Assertions._
@@ -2416,6 +2416,193 @@ class UnifiedLogTest {
     KafkaConfig.fromProps(props)
   }
 
+  @Test
+  def testFetchEarliestPendingUploadTimestampNoRemoteStorage(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    // Test initial state before any records
+    assertFetchOffsetBySpecialTimestamp(log, None, new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
+      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+
+    // Append records
+    val _ = prepareLogWithSequentialRecords(log, recordCount = 2)
+
+    // Test state after records are appended
+    assertFetchOffsetBySpecialTimestamp(log, None, new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1, Optional.of(-1)),
+      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+  }
+
+  @Test
+  def testFetchEarliestPendingUploadTimestampWithRemoteStorage(): Unit = {
+    val logStartOffset = 0
+    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
+
+    doAnswer(ans => {
+      val timestamp = ans.getArgument(1).asInstanceOf[Long]
+      Optional.of(timestamp)
+        .filter(_ == timestampAndEpochs.head.timestamp)
+        .map[TimestampAndOffset](x => new TimestampAndOffset(x, 0L, 
Optional.of(timestampAndEpochs.head.leaderEpoch)))
+    
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+      anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+    // Offset 0 (first timestamp) is in remote storage and deleted locally. 
Offset 1 (second timestamp) is in local storage.
+    log.updateLocalLogStartOffset(1)
+    log.updateHighestOffsetInRemoteStorage(0)
+
+    // In the assertions below we test that offset 0 (first timestamp) is only 
in remote and offset 1 (second timestamp) is in local storage.
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+      ListOffsetsRequest.LATEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+  }
+
+  @Test
+  def 
testFetchEarliestPendingUploadTimestampWithRemoteStorageNoLocalDeletion(): Unit 
= {
+    val logStartOffset = 0
+    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
+
+    // Offsets upto 1 are in remote storage
+    doAnswer(ans => {
+      val timestamp = ans.getArgument(1).asInstanceOf[Long]
+      Optional.of(
+        timestamp match {
+          case x if x == firstTimestamp => new TimestampAndOffset(x, 0L, 
Optional.of(firstLeaderEpoch))
+          case x if x == secondTimestamp => new TimestampAndOffset(x, 1L, 
Optional.of(secondLeaderEpoch))
+          case _ => null
+        }
+      )
+    
}).when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+      anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+    // Offsets 0, 1 (first and second timestamps) are in remote storage and 
not deleted locally.
+    log.updateLocalLogStartOffset(0)
+    log.updateHighestOffsetInRemoteStorage(1)
+
+    // In the assertions below we test that offset 0 (first timestamp) and 
offset 1 (second timestamp) are on both remote and local storage
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, 
Optional.of(secondLeaderEpoch)),
+      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+      ListOffsetsRequest.LATEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(thirdLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+  }
+
+  @Test
+  def testFetchEarliestPendingUploadTimestampNoSegmentsUploaded(): Unit = {
+    val logStartOffset = 0
+    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
+
+    // No offsets are in remote storage
+    doAnswer(_ => Optional.empty[TimestampAndOffset]())
+      
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+        anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+    // Offsets 0, 1, 2 (first, second and third timestamps) are in local 
storage only and not uploaded to remote storage.
+    log.updateLocalLogStartOffset(0)
+    log.updateHighestOffsetInRemoteStorage(-1)
+
+    // In the assertions below we test that offset 0 (first timestamp), offset 
1 (second timestamp) and offset 2 (third timestamp) are only on the local 
storage.
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1L, Optional.of(-1)),
+      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 3L, 
Optional.of(thirdLeaderEpoch)),
+      ListOffsetsRequest.LATEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+  }
+
+  @Test
+  def testFetchEarliestPendingUploadTimestampStaleHighestOffsetInRemote(): 
Unit = {
+    val logStartOffset = 100
+    val (remoteLogManager: RemoteLogManager, log: UnifiedLog, 
timestampAndEpochs: Seq[TimestampAndEpoch]) = prepare(logStartOffset)
+
+    val (firstTimestamp, firstLeaderEpoch) = 
(timestampAndEpochs.head.timestamp, timestampAndEpochs.head.leaderEpoch)
+    val (secondTimestamp, secondLeaderEpoch) = 
(timestampAndEpochs(1).timestamp, timestampAndEpochs(1).leaderEpoch)
+    val (_, thirdLeaderEpoch) = (timestampAndEpochs(2).timestamp, 
timestampAndEpochs(2).leaderEpoch)
+
+    // Offsets 100, 101, 102 (first, second and third timestamps) are in local 
storage and not uploaded to remote storage.
+    // Tiered storage copy was disabled and then enabled again, because of 
which the remote log segments are deleted but
+    // the highest offset in remote storage has become stale
+    doAnswer(_ => Optional.empty[TimestampAndOffset]())
+      
.when(remoteLogManager).findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+        anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache))
+
+    log.updateLocalLogStartOffset(100)
+    log.updateHighestOffsetInRemoteStorage(50)
+
+    // In the assertions below we test that offset 100 (first timestamp), 
offset 101 (second timestamp) and offset 102 (third timestamp) are only on the 
local storage.
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(firstTimestamp, 100L, Optional.of(firstLeaderEpoch))), 
firstTimestamp)
+    assertFetchOffsetByTimestamp(log, Some(remoteLogManager), Some(new 
TimestampAndOffset(secondTimestamp, 101L, Optional.of(secondLeaderEpoch))), 
secondTimestamp)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 50L, 
Optional.empty()),
+      ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 103L, 
Optional.of(thirdLeaderEpoch)),
+      ListOffsetsRequest.LATEST_TIMESTAMP)
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager),new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 100L, 
Optional.of(firstLeaderEpoch)),
+      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+  }
+
+  private def prepare(logStartOffset: Int): (RemoteLogManager, UnifiedLog, 
Seq[TimestampAndEpoch]) = {
+    val config: KafkaConfig = createKafkaConfigWithRLM
+    val purgatory = new 
DelayedOperationPurgatory[DelayedRemoteListOffsets]("RemoteListOffsets", 
config.brokerId)
+    val remoteLogManager = spy(new 
RemoteLogManager(config.remoteLogManagerConfig,
+      0,
+      logDir.getAbsolutePath,
+      "clusterId",
+      mockTime,
+      _ => Optional.empty[UnifiedLog](),
+      (_, _) => {},
+      brokerTopicStats,
+      new Metrics(),
+      Optional.empty))
+    remoteLogManager.setDelayedOperationPurgatory(purgatory)
+
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1, remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, logStartOffset = logStartOffset, 
remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
+
+    // Verify earliest pending upload offset for empty log
+    assertFetchOffsetBySpecialTimestamp(log, Some(remoteLogManager), new 
TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, logStartOffset, 
Optional.empty()),
+      ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP)
+
+    val timestampAndEpochs = prepareLogWithSequentialRecords(log, recordCount 
= 3)
+    (remoteLogManager, log, timestampAndEpochs)
+  }
+
   /**
    * Test the Log truncate operations
    */
@@ -4786,6 +4973,44 @@ class UnifiedLogTest {
 
     (log, segmentWithOverflow)
   }
+
+  private def assertFetchOffsetByTimestamp(log: UnifiedLog, 
remoteLogManagerOpt: Option[RemoteLogManager], expected: 
Option[TimestampAndOffset], timestamp: Long): Unit = {
+    val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
+    val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, 
remoteOffsetReader)
+    assertTrue(offsetResultHolder.futureHolderOpt.isPresent)
+    offsetResultHolder.futureHolderOpt.get.taskFuture.get(1, TimeUnit.SECONDS)
+    assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.isDone)
+    
assertTrue(offsetResultHolder.futureHolderOpt.get.taskFuture.get().hasTimestampAndOffset)
+    assertEquals(expected.get, 
offsetResultHolder.futureHolderOpt.get.taskFuture.get().timestampAndOffset().orElse(null))
+  }
+
+  private def assertFetchOffsetBySpecialTimestamp(log: UnifiedLog, 
remoteLogManagerOpt: Option[RemoteLogManager], expected: TimestampAndOffset, 
timestamp: Long): Unit = {
+    val remoteOffsetReader = getRemoteOffsetReader(remoteLogManagerOpt)
+    val offsetResultHolder = log.fetchOffsetByTimestamp(timestamp, 
remoteOffsetReader)
+    assertEquals(new OffsetResultHolder(expected), offsetResultHolder)
+  }
+
+  private def getRemoteOffsetReader(remoteLogManagerOpt: Option[Any]): 
Optional[AsyncOffsetReader] = {
+    remoteLogManagerOpt match {
+      case Some(remoteLogManager) => 
Optional.of(remoteLogManager.asInstanceOf[AsyncOffsetReader])
+      case None => Optional.empty[AsyncOffsetReader]()
+    }
+  }
+
+  private def prepareLogWithSequentialRecords(log: UnifiedLog, recordCount: 
Int): Seq[TimestampAndEpoch] = {
+    val firstTimestamp = mockTime.milliseconds()
+
+    (0 until recordCount).map { i =>
+      val timestampAndEpoch = TimestampAndEpoch(firstTimestamp + i, i)
+      log.appendAsLeader(
+        TestUtils.singletonRecords(value = TestUtils.randomBytes(10), 
timestamp = timestampAndEpoch.timestamp),
+        timestampAndEpoch.leaderEpoch
+      )
+      timestampAndEpoch
+    }
+  }
+
+  case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
 }
 
 object UnifiedLogTest {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index dd7c5937bdc..ceca9a6a7de 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -283,7 +283,9 @@ public enum MetadataVersion {
     }
 
     public short listOffsetRequestVersion() {
-        if (this.isAtLeast(IBP_4_0_IV3)) {
+        if (this.isAtLeast(IBP_4_2_IV1)) {
+            return 11;
+        } else if (this.isAtLeast(IBP_4_0_IV3)) {
             return 10;
         } else if (this.isAtLeast(IBP_3_9_IV0)) {
             return 9;
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 508d4bd900b..49a200f6225 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -266,8 +266,8 @@ class MetadataVersionTest {
     @ParameterizedTest
     @EnumSource(value = MetadataVersion.class)
     public void testListOffsetsValueVersion(MetadataVersion metadataVersion) {
-        final short expectedVersion = 10;
-        if (metadataVersion.isAtLeast(IBP_4_0_IV3)) {
+        final short expectedVersion = 11;
+        if (metadataVersion.isAtLeast(IBP_4_2_IV1)) {
             assertEquals(expectedVersion, 
metadataVersion.listOffsetRequestVersion());
         } else {
             assertTrue(metadataVersion.listOffsetRequestVersion() < 
expectedVersion);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index ca32e4f086a..769f59d56dc 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -1667,6 +1667,8 @@ public class UnifiedLog implements AutoCloseable {
                         } else {
                             return new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)));
                         }
+                    } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) {
+                        return 
fetchEarliestPendingUploadOffset(remoteOffsetReader);
                     } else if (targetTimestamp == 
ListOffsetsRequest.MAX_TIMESTAMP) {
                         // Cache to avoid race conditions.
                         List<LogSegment> segments = logSegments();
@@ -1709,6 +1711,31 @@ public class UnifiedLog implements AutoCloseable {
                 });
     }
 
+    private OffsetResultHolder 
fetchEarliestPendingUploadOffset(Optional<AsyncOffsetReader> 
remoteOffsetReader) {
+        if (remoteLogEnabled()) {
+            long curHighestRemoteOffset = highestOffsetInRemoteStorage();
+
+            if (curHighestRemoteOffset == -1L) {
+                if (localLogStartOffset() == logStartOffset()) {
+                    // No segments have been uploaded yet
+                    return 
fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, 
remoteOffsetReader);
+                } else {
+                    // Leader currently does not know about the already 
uploaded segments
+                    return new OffsetResultHolder(Optional.of(new 
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, 
Optional.of(-1))));
+                }
+            } else {
+                long earliestPendingUploadOffset = 
Math.max(curHighestRemoteOffset + 1, logStartOffset());
+                OptionalInt epochForOffset = 
leaderEpochCache.epochForOffset(earliestPendingUploadOffset);
+                Optional<Integer> epochResult = epochForOffset.isPresent()
+                    ? Optional.of(epochForOffset.getAsInt())
+                    : Optional.empty();
+                return new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
earliestPendingUploadOffset, epochResult));
+            }
+        } else {
+            return new OffsetResultHolder(new 
FileRecords.TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)));
+        }
+    }
+
     /**
      * Checks if the log is empty.
      * @return Returns True when the log is empty. Otherwise, false.
diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java 
b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
index 8cc9428afbd..ae16d11d8ed 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
@@ -126,7 +126,7 @@ public class GetOffsetShell {
                     .ofType(String.class);
             timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
committed record timestamp is given.]")
                     .withRequiredArg()
-                    .describedAs("<timestamp> / -1 or latest / -2 or earliest 
/ -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered")
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest 
/ -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or 
earliest-pending-upload")
                     .ofType(String.class)
                     .defaultsTo("latest");
             commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
@@ -276,6 +276,8 @@ public class GetOffsetShell {
                 return OffsetSpec.earliestLocal();
             case "latest-tiered":
                 return OffsetSpec.latestTiered();
+            case "earliest-pending-upload":
+                return OffsetSpec.earliestPendingUpload();
             default:
                 long timestamp;
 
@@ -283,7 +285,7 @@ public class GetOffsetShell {
                     timestamp = Long.parseLong(listOffsetsTimestamp);
                 } catch (NumberFormatException e) {
                     throw new TerseException("Malformed time argument " + 
listOffsetsTimestamp + ". " +
-                            "Please use -1 or latest / -2 or earliest / -3 or 
max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long 
format timestamp");
+                            "Please use -1 or latest / -2 or earliest / -3 or 
max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or 
earliest-pending-upload, or a specified long format timestamp");
                 }
 
                 if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
@@ -296,6 +298,8 @@ public class GetOffsetShell {
                     return OffsetSpec.earliestLocal();
                 } else if (timestamp == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
                     return OffsetSpec.latestTiered();
+                } else if (timestamp == 
ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_TIMESTAMP) {
+                    return OffsetSpec.earliestPendingUpload();
                 } else {
                     return OffsetSpec.forTimestamp(timestamp);
                 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
index 53c1c4d79c9..db53695a7be 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
@@ -247,7 +247,7 @@ public class GetOffsetShellParsingTest {
     @Test
     public void testInvalidOffset() {
         assertEquals("Malformed time argument foo. " +
-                        "Please use -1 or latest / -2 or earliest / -3 or 
max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long 
format timestamp",
+                        "Please use -1 or latest / -2 or earliest / -3 or 
max-timestamp / -4 or earliest-local / -5 or latest-tiered / -6 or 
earliest-pending-upload, or a specified long format timestamp",
                 assertThrows(TerseException.class, () -> 
GetOffsetShell.parseOffsetSpec("foo")).getMessage());
     }
 
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 9986daa7f3b..c1c7b27639a 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -367,6 +367,30 @@ public class GetOffsetShellTest {
         }
     }
 
+    @ClusterTemplate("withRemoteStorage")
+    public void testGetOffsetsByEarliestTieredSpec() throws 
InterruptedException {
+        setUp();
+        setUpRemoteLogTopics();
+
+        for (String time : new String[] {"-6", "earliest-pending-upload"}) {
+            // test topics disable remote log storage
+            // as remote log disabled, broker returns unknown offset of each 
topic partition and these
+            // unknown offsets are ignore by GetOffsetShell, hence we have 
empty result here.
+            assertEquals(List.of(),
+                executeAndParse("--topic-partitions", "topic\\d+:0", "--time", 
time));
+
+            // test topics enable remote log storage
+            TestUtils.waitForCondition(() ->
+                    List.of(
+                            new Row("topicRLS1", 0, 0L),
+                            new Row("topicRLS2", 0, 1L),
+                            new Row("topicRLS3", 0, 2L),
+                            new Row("topicRLS4", 0, 3L))
+                            .equals(executeAndParse("--topic-partitions", 
"topicRLS.*:0", "--time", time)),
+                    "testGetOffsetsByEarliestTieredSpec result not match");
+        }
+    }
+
     @ClusterTest
     public void testGetOffsetsByTimestamp() {
         setUp();


Reply via email to