This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84add30ea54 KAFKA-16154: Broker returns offset for 
LATEST_TIERED_TIMESTAMP (#16783)
84add30ea54 is described below

commit 84add30ea549dbd87a239a443ce36d3e10ce336a
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Mon Aug 5 10:41:14 2024 +0800

    KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP (#16783)
    
    This pr support EarliestLocalSpec LatestTierSpec in GetOffsetShell, and add 
integration tests.
    
    Reviewers: Luke Chen <[email protected]>, Chia-Ping Tsai 
<[email protected]>, PoAn Yang <[email protected]>
---
 build.gradle                                       |   1 +
 checkstyle/import-control.xml                      |   2 +
 .../kafka/clients/admin/KafkaAdminClient.java      |   2 +-
 .../org/apache/kafka/clients/admin/OffsetSpec.java |  22 ++--
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   4 +-
 .../org/apache/kafka/tools/GetOffsetShell.java     |  15 ++-
 .../kafka/tools/GetOffsetShellParsingTest.java     |   8 ++
 .../org/apache/kafka/tools/GetOffsetShellTest.java | 127 +++++++++++++++++++--
 8 files changed, 158 insertions(+), 23 deletions(-)

diff --git a/build.gradle b/build.gradle
index 51f9659e587..becac56bf88 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2141,6 +2141,7 @@ project(':tools') {
     testImplementation project(':connect:runtime')
     testImplementation project(':connect:runtime').sourceSets.test.output
     testImplementation project(':storage:storage-api').sourceSets.main.output
+    testImplementation project(':storage').sourceSets.test.output
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
     testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a5784ef935c..3f8212f9976 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -284,6 +284,8 @@
     <allow pkg="org.apache.kafka.storage.internals" />
     <allow pkg="org.apache.kafka.server.config" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.log.remote.metadata.storage" />
+    <allow pkg="org.apache.kafka.server.log.remote.storage" />
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="org.apache.kafka.clients.producer" />
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8eb7fb4e8c0..2f195489add 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4862,7 +4862,7 @@ public class KafkaAdminClient extends AdminClient {
             return ListOffsetsRequest.MAX_TIMESTAMP;
         } else if (offsetSpec instanceof OffsetSpec.EarliestLocalSpec) {
             return ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP;
-        } else if (offsetSpec instanceof OffsetSpec.LatestTierSpec) {
+        } else if (offsetSpec instanceof OffsetSpec.LatestTieredSpec) {
             return ListOffsetsRequest.LATEST_TIERED_TIMESTAMP;
         }
         return ListOffsetsRequest.LATEST_TIMESTAMP;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index 5b2fbb3e2e9..68f94cc493e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -27,7 +27,7 @@ public class OffsetSpec {
     public static class LatestSpec extends OffsetSpec { }
     public static class MaxTimestampSpec extends OffsetSpec { }
     public static class EarliestLocalSpec extends OffsetSpec { }
-    public static class LatestTierSpec extends OffsetSpec { }
+    public static class LatestTieredSpec extends OffsetSpec { }
     public static class TimestampSpec extends OffsetSpec {
         private final long timestamp;
 
@@ -73,20 +73,22 @@ public class OffsetSpec {
     }
 
     /**
-     * Used to retrieve the offset with the local log start offset,
-     * log start offset is the offset of a log above which reads are 
guaranteed to be served
-     * from the disk of the leader broker, when Tiered Storage is not enabled, 
it behaves the same
-     * as the earliest timestamp
+     * Used to retrieve the local log start offset.
+     * Local log start offset is the offset of a log above which reads
+     * are guaranteed to be served from the disk of the leader broker.
+     * <br/>
+     * Note: When tiered Storage is not enabled, it behaves the same as 
retrieving the earliest timestamp offset.
      */
-    public static OffsetSpec earliestLocalSpec() {
+    public static OffsetSpec earliestLocal() {
         return new EarliestLocalSpec();
     }
 
     /**
-     * Used to retrieve the offset with the highest offset of data stored in 
remote storage,
-     * and when Tiered Storage is not enabled, we won't return any offset 
(i.e. Unknown offset)
+     * Used to retrieve the highest offset of data stored in remote storage.
+     * <br/>
+     * Note: When tiered storage is not enabled, we will return unknown offset.
      */
-    public static OffsetSpec latestTierSpec() {
-        return new LatestTierSpec();
+    public static OffsetSpec latestTiered() {
+        return new LatestTieredSpec();
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 8d70e60fc05..dc74a178442 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5864,7 +5864,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
 
-            env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.earliestLocalSpec()));
+            env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.earliestLocal()));
 
             TestUtils.waitForCondition(() -> 
env.kafkaClient().requests().stream().anyMatch(request ->
                 request.requestBuilder().apiKey().messageType == 
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() 
== 9
@@ -5892,7 +5892,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
 
-            env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.latestTierSpec()));
+            env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.latestTiered()));
 
             TestUtils.waitForCondition(() -> 
env.kafkaClient().requests().stream().anyMatch(request ->
                     request.requestBuilder().apiKey().messageType == 
ApiMessageType.LIST_OFFSETS && request.requestBuilder().oldestAllowedVersion() 
== 9
diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java 
b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
index 4ba0f6c3e3c..60b78acd22b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
@@ -132,7 +132,7 @@ public class GetOffsetShell {
                     .ofType(String.class);
             timeOpt = parser.accepts("time", "timestamp of the offsets before 
that. [Note: No offset is returned, if the timestamp greater than recently 
committed record timestamp is given.]")
                     .withRequiredArg()
-                    .describedAs("<timestamp> / -1 or latest / -2 or earliest 
/ -3 or max-timestamp")
+                    .describedAs("<timestamp> / -1 or latest / -2 or earliest 
/ -3 or max-timestamp / -4 or earliest-local / -5 or latest-tiered")
                     .ofType(String.class)
                     .defaultsTo("latest");
             commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
@@ -275,7 +275,8 @@ public class GetOffsetShell {
         }
     }
 
-    private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws 
TerseException {
+    // visible for tseting
+    static OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws 
TerseException {
         switch (listOffsetsTimestamp) {
             case "earliest":
                 return OffsetSpec.earliest();
@@ -283,6 +284,10 @@ public class GetOffsetShell {
                 return OffsetSpec.latest();
             case "max-timestamp":
                 return OffsetSpec.maxTimestamp();
+            case "earliest-local":
+                return OffsetSpec.earliestLocal();
+            case "latest-tiered":
+                return OffsetSpec.latestTiered();
             default:
                 long timestamp;
 
@@ -290,7 +295,7 @@ public class GetOffsetShell {
                     timestamp = Long.parseLong(listOffsetsTimestamp);
                 } catch (NumberFormatException e) {
                     throw new TerseException("Malformed time argument " + 
listOffsetsTimestamp + ". " +
-                            "Please use -1 or latest / -2 or earliest / -3 or 
max-timestamp, or a specified long format timestamp");
+                            "Please use -1 or latest / -2 or earliest / -3 or 
max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long 
format timestamp");
                 }
 
                 if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
@@ -299,6 +304,10 @@ public class GetOffsetShell {
                     return OffsetSpec.latest();
                 } else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
                     return OffsetSpec.maxTimestamp();
+                } else if (timestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
+                    return OffsetSpec.earliestLocal();
+                } else if (timestamp == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
+                    return OffsetSpec.latestTiered();
                 } else {
                     return OffsetSpec.forTimestamp(timestamp);
                 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
index 3c4ef0894f7..9e81c23f309 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.server.util.TopicPartitionFilter;
 
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -243,6 +244,13 @@ public class GetOffsetShellParsingTest {
         assertThrows(TerseException.class, () -> 
GetOffsetShell.execute("--bootstrap-server", "localhost:9092", "--time", 
"invalid"));
     }
 
+    @Test
+    public void testInvalidOffset() {
+        assertEquals("Malformed time argument foo. " +
+                        "Please use -1 or latest / -2 or earliest / -3 or 
max-timestamp / -4 or earliest-local / -5 or latest-tiered, or a specified long 
format timestamp",
+                assertThrows(TerseException.class, () -> 
GetOffsetShell.parseOffsetSpec("foo")).getMessage());
+    }
+
     private TopicPartition getTopicPartition(String topic, Integer partition) {
         return new TopicPartition(topic, partition);
     }
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 2d588c60257..95007d7bf85 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.kafka.tools;
 
+import kafka.test.ClusterConfig;
 import kafka.test.ClusterInstance;
 import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTemplate;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.ClusterTestDefaults;
 import kafka.test.junit.ClusterTestExtensions;
@@ -31,23 +33,36 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.config.ServerLogConfigs;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static kafka.test.annotation.Type.CO_KRAFT;
+import static kafka.test.annotation.Type.KRAFT;
+import static kafka.test.annotation.Type.ZK;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -70,15 +85,41 @@ public class GetOffsetShellTest {
         return "topic" + i;
     }
 
+    private String getRemoteLogStorageEnabledTopicName(int i) {
+        return "topicRLS" + i;
+    }
+
     private void setUp() {
+        setupTopics(this::getTopicName, Collections.emptyMap());
+        sendProducerRecords(this::getTopicName);
+    }
+
+    private void setUpRemoteLogTopics() {
+        // In this method, we'll create 4 topics and produce records to the 
log like this:
+        // topicRLS1 -> 1 segment
+        // topicRLS2 -> 2 segments (1 local log segment + 1 segment in the 
remote storage)
+        // topicRLS3 -> 3 segments (1 local log segment + 2 segments in the 
remote storage)
+        // topicRLS4 -> 4 segments (1 local log segment + 3 segments in the 
remote storage)
+        Map<String, String> rlsConfigs = new HashMap<>();
+        rlsConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true");
+        rlsConfigs.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+        rlsConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "100");
+        setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
+        sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
+    }
+
+    private void setupTopics(Function<Integer, String> topicName, Map<String, 
String> configs) {
         try (Admin admin = cluster.createAdminClient()) {
             List<NewTopic> topics = new ArrayList<>();
 
-            IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
+            IntStream.range(0, topicCount + 1).forEach(i ->
+                    topics.add(new NewTopic(topicName.apply(i), i, (short) 
1).configs(configs)));
 
             admin.createTopics(topics);
         }
+    }
 
+    private void sendProducerRecords(Function<Integer, String> topicName) {
         Properties props = new Properties();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
@@ -86,15 +127,34 @@ public class GetOffsetShellTest {
 
         try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(props)) {
             IntStream.range(0, topicCount + 1)
-                .forEach(i -> IntStream.range(0, i * i)
-                        .forEach(msgCount -> {
-                            assertDoesNotThrow(() -> producer.send(
-                                    new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
-                        })
-                );
+                    .forEach(i -> IntStream.range(0, i * i)
+                            .forEach(msgCount -> assertDoesNotThrow(() -> 
producer.send(
+                                    new ProducerRecord<>(topicName.apply(i), 
msgCount % i, null, "val" + msgCount)).get())));
         }
     }
 
+    private static List<ClusterConfig> withRemoteStorage() {
+        Map<String, String> serverProperties = new HashMap<>();
+        
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP,
 "1");
+        
serverProperties.put(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX
 + 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP,
 "1");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
 "true");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 LocalTieredStorage.class.getName());
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "1000");
+        serverProperties.put(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, 
"1000");
+        
serverProperties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "100");
+        
serverProperties.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "EXTERNAL");
+
+        return Collections.singletonList(
+                // we set REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP to 
EXTERNAL, so we need to
+                // align listener name here as KafkaClusterTestKit 
(KRAFT/CO_KRAFT) the default
+                // broker listener name is EXTERNAL while in ZK it is PLAINTEXT
+                ClusterConfig.defaultBuilder()
+                        .setTypes(Stream.of(ZK, KRAFT, 
CO_KRAFT).collect(Collectors.toSet()))
+                        .setServerProperties(serverProperties)
+                        .setListenerName("EXTERNAL")
+                        .build());
+    }
+
     private void createConsumerAndPoll() {
         Properties props = new Properties();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
@@ -274,6 +334,59 @@ public class GetOffsetShellTest {
         }
     }
 
+    @ClusterTemplate("withRemoteStorage")
+    public void testGetOffsetsByEarliestLocalSpec() throws 
InterruptedException {
+        setUp();
+        setUpRemoteLogTopics();
+
+        for (String time : new String[] {"-4", "earliest-local"}) {
+            // test topics disable remote log storage
+            // as remote log disabled, broker return the same result as 
earliest offset
+            TestUtils.waitForCondition(() ->
+                    Arrays.asList(
+                            new Row("topic1", 0, 0L),
+                            new Row("topic2", 0, 0L),
+                            new Row("topic3", 0, 0L),
+                            new Row("topic4", 0, 0L))
+                            .equals(executeAndParse("--topic-partitions", 
"topic\\d+.*:0", "--time", time)),
+                    "testGetOffsetsByEarliestLocalSpec get topics with remote 
log disabled result not match");
+
+            // test topics enable remote log storage
+            TestUtils.waitForCondition(() ->
+                    Arrays.asList(
+                            new Row("topicRLS1", 0, 0L),
+                            new Row("topicRLS2", 0, 1L),
+                            new Row("topicRLS3", 0, 2L),
+                            new Row("topicRLS4", 0, 3L))
+                            .equals(executeAndParse("--topic-partitions", 
"topicRLS.*:0", "--time", time)),
+                    "testGetOffsetsByEarliestLocalSpec get topics with remote 
log enabled result not match");
+        }
+    }
+
+    @ClusterTemplate("withRemoteStorage")
+    public void testGetOffsetsByLatestTieredSpec() throws InterruptedException 
{
+        setUp();
+        setUpRemoteLogTopics();
+
+        for (String time : new String[] {"-5", "latest-tiered"}) {
+            // test topics disable remote log storage
+            // as remote log not enabled, broker return unknown offset for 
each topic partition and these
+            // unknown offsets are ignored by GetOffsetShell hence we have 
empty result here.
+            assertEquals(Collections.emptyList(),
+                    executeAndParse("--topic-partitions", "topic\\d+:0", 
"--time", time));
+
+            // test topics enable remote log storage
+            // topicRLS1 has no result because there's no log segments being 
uploaded to the remote storage
+            TestUtils.waitForCondition(() ->
+                    Arrays.asList(
+                            new Row("topicRLS2", 0, 0L),
+                            new Row("topicRLS3", 0, 1L),
+                            new Row("topicRLS4", 0, 2L))
+                            .equals(executeAndParse("--topic-partitions", 
"topicRLS.*:0", "--time", time)),
+                    "testGetOffsetsByLatestTieredSpec result not match");
+        }
+    }
+
     @ClusterTest
     public void testGetOffsetsByTimestamp() {
         setUp();

Reply via email to