This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bd72ef1 KAFKA-12541; Extend ListOffset to fetch offset with max
timestamp (KIP-734) (#10760)
bd72ef1 is described below
commit bd72ef1bf1e40feb3bc17349a385b479fa5fa530
Author: thomaskwscott <[email protected]>
AuthorDate: Fri Jun 25 13:29:12 2021 +0100
KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)
(#10760)
This patch implements KIP-734 as described in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp.
Reviewers: David Jacot <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 51 ++++++++-
.../org/apache/kafka/clients/admin/OffsetSpec.java | 10 ++
.../kafka/clients/consumer/internals/Fetcher.java | 2 +-
.../kafka/common/requests/ListOffsetsRequest.java | 7 +-
.../common/message/ListOffsetsRequest.json | 4 +-
.../common/message/ListOffsetsResponse.json | 4 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 122 ++++++++++++++++++++-
.../common/requests/ListOffsetsRequestTest.java | 4 +-
.../kafka/common/requests/RequestResponseTest.java | 6 +-
core/src/main/scala/kafka/api/ApiVersion.scala | 11 +-
core/src/main/scala/kafka/log/Log.scala | 10 ++
.../scala/kafka/server/ReplicaFetcherThread.scala | 3 +-
.../kafka/admin/ListOffsetsIntegrationTest.scala | 95 ++++++++++++++++
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 29 +++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../unit/kafka/server/ListOffsetsRequestTest.scala | 33 +++++-
.../scala/unit/kafka/server/LogOffsetTest.scala | 82 +++++++++++---
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 5 +-
.../jmh/common/ListOffsetRequestBenchmark.java | 2 +-
21 files changed, 441 insertions(+), 47 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6477bd0..7a34541 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4209,11 +4209,7 @@ public class KafkaAdminClient extends AdminClient {
OffsetSpec offsetSpec = entry.getValue();
TopicPartition tp = entry.getKey();
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
- long offsetQuery = (offsetSpec instanceof TimestampSpec)
- ? ((TimestampSpec) offsetSpec).timestamp()
- : (offsetSpec instanceof OffsetSpec.EarliestSpec)
- ? ListOffsetsRequest.EARLIEST_TIMESTAMP
- : ListOffsetsRequest.LATEST_TIMESTAMP;
+ long offsetQuery = getOffsetFromOffsetSpec(offsetSpec);
// avoid sending listOffsets request for topics with errors
if (!mr.errors().containsKey(tp.topic())) {
Node node = mr.cluster().leaderFor(tp);
@@ -4236,10 +4232,12 @@ public class KafkaAdminClient extends AdminClient {
final List<ListOffsetsTopic> partitionsToQuery = new
ArrayList<>(entry.getValue().values());
+ private boolean supportsMaxTimestamp = true;
+
@Override
ListOffsetsRequest.Builder createRequest(int timeoutMs) {
return ListOffsetsRequest.Builder
- .forConsumer(true,
context.options().isolationLevel())
+ .forConsumer(true,
context.options().isolationLevel(), supportsMaxTimestamp)
.setTargetTimes(partitionsToQuery);
}
@@ -4298,6 +4296,36 @@ public class KafkaAdminClient extends AdminClient {
}
}
}
+
+ @Override
+ boolean
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+ if (supportsMaxTimestamp) {
+ supportsMaxTimestamp = false;
+
+ // fail any unsupported futures and remove partitions
from the downgraded retry
+ boolean foundMaxTimestampPartition = false;
+ Iterator<ListOffsetsTopic> topicIterator =
partitionsToQuery.iterator();
+ while (topicIterator.hasNext()) {
+ ListOffsetsTopic topic = topicIterator.next();
+ Iterator<ListOffsetsPartition> partitionIterator =
topic.partitions().iterator();
+ while (partitionIterator.hasNext()) {
+ ListOffsetsPartition partition =
partitionIterator.next();
+ if (partition.timestamp() ==
ListOffsetsRequest.MAX_TIMESTAMP) {
+ foundMaxTimestampPartition = true;
+ futures.get(new
TopicPartition(topic.name(), partition.partitionIndex()))
+ .completeExceptionally(new
UnsupportedVersionException(
+ "Broker " + brokerId + " does not
support MAX_TIMESTAMP offset spec"));
+ partitionIterator.remove();
+ }
+ }
+ if (topic.partitions().isEmpty()) {
+ topicIterator.remove();
+ }
+ }
+ return foundMaxTimestampPartition &&
!partitionsToQuery.isEmpty();
+ }
+ return false;
+ }
});
}
return calls;
@@ -4834,6 +4862,17 @@ public class KafkaAdminClient extends AdminClient {
};
}
+ private long getOffsetFromOffsetSpec(OffsetSpec offsetSpec) {
+ if (offsetSpec instanceof TimestampSpec) {
+ return ((TimestampSpec) offsetSpec).timestamp();
+ } else if (offsetSpec instanceof OffsetSpec.EarliestSpec) {
+ return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+ } else if (offsetSpec instanceof OffsetSpec.MaxTimestampSpec) {
+ return ListOffsetsRequest.MAX_TIMESTAMP;
+ }
+ return ListOffsetsRequest.LATEST_TIMESTAMP;
+ }
+
/**
* Get a sub level error when the request is in batch. If given key was
not found,
* return an {@link IllegalArgumentException}.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
index 339e9cf..dcf9045 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java
@@ -25,6 +25,7 @@ public class OffsetSpec {
public static class EarliestSpec extends OffsetSpec { }
public static class LatestSpec extends OffsetSpec { }
+ public static class MaxTimestampSpec extends OffsetSpec { }
public static class TimestampSpec extends OffsetSpec {
private final long timestamp;
@@ -60,4 +61,13 @@ public class OffsetSpec {
return new TimestampSpec(timestamp);
}
+ /**
+ * Used to retrieve the offset with the largest timestamp of a partition
+ * as message timestamps can be specified client side this may not match
+ * the log end offset returned by LatestSpec
+ */
+ public static OffsetSpec maxTimestamp() {
+ return new MaxTimestampSpec();
+ }
+
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index adfaae9..3f7d31b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -978,7 +978,7 @@ public class Fetcher<K, V> implements Closeable {
final
Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
boolean
requireTimestamp) {
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
- .forConsumer(requireTimestamp, isolationLevel)
+ .forConsumer(requireTimestamp, isolationLevel, false)
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
index 658188d..6b7734a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.Errors;
public class ListOffsetsRequest extends AbstractRequest {
public static final long EARLIEST_TIMESTAMP = -2L;
public static final long LATEST_TIMESTAMP = -1L;
+ public static final long MAX_TIMESTAMP = -3L;
public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;
@@ -54,9 +55,11 @@ public class ListOffsetsRequest extends AbstractRequest {
return new Builder((short) 0, allowedVersion, replicaId,
IsolationLevel.READ_UNCOMMITTED);
}
- public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel) {
+ public static Builder forConsumer(boolean requireTimestamp,
IsolationLevel isolationLevel, boolean requireMaxTimestamp) {
short minVersion = 0;
- if (isolationLevel == IsolationLevel.READ_COMMITTED)
+ if (requireMaxTimestamp)
+ minVersion = 7;
+ else if (isolationLevel == IsolationLevel.READ_COMMITTED)
minVersion = 2;
else if (requireTimestamp)
minVersion = 1;
diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json
b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index a464c93..93c920e 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -30,7 +30,9 @@
// Version 5 is the same as version 4.
//
// Version 6 enables flexible versions.
- "validVersions": "0-6",
+ //
+ // Version 7 enables listing offsets by max timestamp (KIP-734).
+ "validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json
b/clients/src/main/resources/common/message/ListOffsetsResponse.json
index 727bb8f..6d6be0f 100644
--- a/clients/src/main/resources/common/message/ListOffsetsResponse.json
+++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json
@@ -29,7 +29,9 @@
// Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
//
// Version 6 enables flexible versions.
- "validVersions": "0-6",
+ //
+ // Version 7 is the same as version 6 (KIP-734).
+ "validVersions": "0-7",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+",
"ignorable": true,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0013d33..4f6cddb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -172,6 +172,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.ListTransactionsRequest;
@@ -4032,6 +4033,7 @@ public class KafkaAdminClientTest {
pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new
Node[]{node0}));
pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new
Node[]{node0}));
pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new
Node[]{node0}));
+ pInfos.add(new PartitionInfo("qux", 0, node0, new Node[]{node0}, new
Node[]{node0}));
final Cluster cluster =
new Cluster(
"mockClusterId",
@@ -4044,6 +4046,7 @@ public class KafkaAdminClientTest {
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("bar", 0);
final TopicPartition tp2 = new TopicPartition("baz", 0);
+ final TopicPartition tp3 = new TopicPartition("qux", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster))
{
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -4053,15 +4056,17 @@ public class KafkaAdminClientTest {
ListOffsetsTopicResponse t0 =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L,
123L, 321);
ListOffsetsTopicResponse t1 =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L,
234L, 432);
ListOffsetsTopicResponse t2 =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp2, Errors.NONE,
123456789L, 345L, 543);
+ ListOffsetsTopicResponse t3 =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp3, Errors.NONE,
234567890L, 456L, 654);
ListOffsetsResponseData responseData = new
ListOffsetsResponseData()
.setThrottleTimeMs(0)
- .setTopics(Arrays.asList(t0, t1, t2));
+ .setTopics(Arrays.asList(t0, t1, t2, t3));
env.kafkaClient().prepareResponse(new
ListOffsetsResponse(responseData));
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp0, OffsetSpec.latest());
partitions.put(tp1, OffsetSpec.earliest());
partitions.put(tp2,
OffsetSpec.forTimestamp(System.currentTimeMillis()));
+ partitions.put(tp3, OffsetSpec.maxTimestamp());
ListOffsetsResult result =
env.adminClient().listOffsets(partitions);
Map<TopicPartition, ListOffsetsResultInfo> offsets =
result.all().get();
@@ -4075,9 +4080,13 @@ public class KafkaAdminClientTest {
assertEquals(345L, offsets.get(tp2).offset());
assertEquals(543, offsets.get(tp2).leaderEpoch().get().intValue());
assertEquals(123456789L, offsets.get(tp2).timestamp());
+ assertEquals(456L, offsets.get(tp3).offset());
+ assertEquals(654, offsets.get(tp3).leaderEpoch().get().intValue());
+ assertEquals(234567890L, offsets.get(tp3).timestamp());
assertEquals(offsets.get(tp0), result.partitionResult(tp0).get());
assertEquals(offsets.get(tp1), result.partitionResult(tp1).get());
assertEquals(offsets.get(tp2), result.partitionResult(tp2).get());
+ assertEquals(offsets.get(tp3), result.partitionResult(tp3).get());
try {
result.partitionResult(new TopicPartition("unknown", 0)).get();
fail("should have thrown IllegalArgumentException");
@@ -4226,6 +4235,117 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+ Node node = new Node(0, "localhost", 8120);
+ List<Node> nodes = Collections.singletonList(node);
+ final Cluster cluster = new Cluster(
+ "mockClusterId",
+ nodes,
+ Collections.singleton(new PartitionInfo("foo", 0, node, new
Node[]{node}, new Node[]{node})),
+ Collections.emptySet(),
+ Collections.emptySet(),
+ node);
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+
+ // listoffsets response from broker 0
+ env.kafkaClient().prepareUnsupportedVersionResponse(
+ request -> request instanceof ListOffsetsRequest);
+
+ ListOffsetsResult result =
env.adminClient().listOffsets(Collections.singletonMap(tp0,
OffsetSpec.maxTimestamp()));
+
+ TestUtils.assertFutureThrows(result.all(),
UnsupportedVersionException.class);
+ }
+ }
+
+ @Test
+ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec()
throws Exception {
+ Node node = new Node(0, "localhost", 8120);
+ List<Node> nodes = Collections.singletonList(node);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new
Node[]{node}));
+ pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new
Node[]{node}));
+ final Cluster cluster = new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ node);
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+ final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+ AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+
+ // listoffsets response from broker 0
+ env.kafkaClient().prepareUnsupportedVersionResponse(
+ request -> request instanceof ListOffsetsRequest);
+
+ ListOffsetsTopicResponse topicResponse =
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L,
345L, 543);
+ ListOffsetsResponseData responseData = new
ListOffsetsResponseData()
+ .setThrottleTimeMs(0)
+ .setTopics(Arrays.asList(topicResponse));
+ env.kafkaClient().prepareResponseFrom(
+ // ensure that no max timestamp requests are retried
+ request -> request instanceof ListOffsetsRequest &&
((ListOffsetsRequest) request).topics().stream()
+ .flatMap(t -> t.partitions().stream())
+ .noneMatch(p -> p.timestamp() ==
ListOffsetsRequest.MAX_TIMESTAMP),
+ new ListOffsetsResponse(responseData), node);
+
+ ListOffsetsResult result = env.adminClient().listOffsets(new
HashMap<TopicPartition, OffsetSpec>() {{
+ put(tp0, OffsetSpec.maxTimestamp());
+ put(tp1, OffsetSpec.latest());
+ }});
+
+ TestUtils.assertFutureThrows(result.partitionResult(tp0),
UnsupportedVersionException.class);
+
+ ListOffsetsResultInfo tp1Offset =
result.partitionResult(tp1).get();
+ assertEquals(345L, tp1Offset.offset());
+ assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+ assertEquals(-1L, tp1Offset.timestamp());
+ }
+ }
+
+ @Test
+ public void testListOffsetsUnsupportedNonMaxTimestamp() {
+ Node node = new Node(0, "localhost", 8120);
+ List<Node> nodes = Collections.singletonList(node);
+ List<PartitionInfo> pInfos = new ArrayList<>();
+ pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new
Node[]{node}));
+ final Cluster cluster = new Cluster(
+ "mockClusterId",
+ nodes,
+ pInfos,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ node);
+ final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+ AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+
+ // listoffsets response from broker 0
+ env.kafkaClient().prepareUnsupportedVersionResponse(
+ request -> request instanceof ListOffsetsRequest);
+
+ ListOffsetsResult result = env.adminClient().listOffsets(
+ Collections.singletonMap(tp0, OffsetSpec.latest()));
+
+ TestUtils.assertFutureThrows(result.partitionResult(tp0),
UnsupportedVersionException.class);
+ }
+ }
+
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
return Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,
false)),
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
index a7c83d6..83c4b10 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java
@@ -67,7 +67,7 @@ public class ListOffsetsRequestTest {
new ListOffsetsPartition()
.setPartitionIndex(0))));
ListOffsetsRequest request = ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_COMMITTED)
+ .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
.setTargetTimes(topics)
.build(version);
ListOffsetsResponse response = (ListOffsetsResponse)
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
@@ -100,7 +100,7 @@ public class ListOffsetsRequestTest {
new ListOffsetsPartition()
.setPartitionIndex(0))));
ListOffsetsRequest request = ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+ .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(topics)
.build((short) 0);
ListOffsetsResponse response = (ListOffsetsResponse)
request.getErrorResponse(0, Errors.NOT_LEADER_OR_FOLLOWER.exception());
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index fc709dc..0208c05 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1454,7 +1454,7 @@ public class RequestResponseTest {
.setMaxNumOffsets(10)
.setCurrentLeaderEpoch(5)));
return ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else if (version == 1) {
@@ -1465,7 +1465,7 @@ public class RequestResponseTest {
.setTimestamp(1000000L)
.setCurrentLeaderEpoch(5)));
return ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+ .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
@@ -1478,7 +1478,7 @@ public class RequestResponseTest {
.setName("test")
.setPartitions(Arrays.asList(partition));
return ListOffsetsRequest.Builder
- .forConsumer(true, IsolationLevel.READ_COMMITTED)
+ .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
.setTargetTimes(Collections.singletonList(topic))
.build((short) version);
} else {
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala
b/core/src/main/scala/kafka/api/ApiVersion.scala
index 724bc98..d73b0a2 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -114,7 +114,9 @@ object ApiVersion {
// Introduced topic IDs to LeaderAndIsr and UpdateMetadata
requests/responses (KIP-516)
KAFKA_2_8_IV1,
// Introduce AllocateProducerIds (KIP-730)
- KAFKA_3_0_IV0
+ KAFKA_3_0_IV0,
+ // Introduce ListOffsets V7 which supports listing offsets by max
timestamp (KIP-734)
+ KAFKA_3_0_IV1
)
// Map keys are the union of the short and full versions
@@ -458,6 +460,13 @@ case object KAFKA_3_0_IV0 extends DefaultApiVersion {
val id: Int = 33
}
+case object KAFKA_3_0_IV1 extends DefaultApiVersion {
+ val shortVersion: String = "3.0"
+ val subVersion = "IV1"
+ val recordVersion = RecordVersion.V2
+ val id: Int = 34
+}
+
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 35ab50c..bccd2a3 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1338,6 +1338,16 @@ class Log(@volatile private var _dir: File,
val latestEpochOpt =
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset,
epochOptional))
+ } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+ // Cache to avoid race conditions. `toBuffer` is faster than most
alternatives and provides
+ // constant time access while being safe to use with concurrent
collections unlike `toArray`.
+ val segmentsCopy = logSegments.toBuffer
+ val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+ val latestEpochOpt =
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+ val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+ Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+ latestTimestampSegment.offsetOfMaxTimestampSoFar,
+ epochOptional))
} else {
// Cache to avoid race conditions. `toBuffer` is faster than most
alternatives and provides
// constant time access while being safe to use with concurrent
collections unlike `toArray`.
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 170b5b9..840c567 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -94,7 +94,8 @@ class ReplicaFetcherThread(name: String,
// Visible for testing
private[server] val listOffsetRequestVersion: Short =
- if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
+ if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV1) 7
+ else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3
diff --git
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
new file mode 100644
index 0000000..c937030
--- /dev/null
+++
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+ val topicName = "foo"
+ var adminClient: Admin = null
+
+ @BeforeEach
+ override def setUp(): Unit = {
+ super.setUp()
+ createTopic(topicName, 1, 1.toShort)
+ produceMessages()
+ adminClient = Admin.create(Map[String, Object](
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+ ).asJava)
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+ super.tearDown()
+ }
+
+ @Test
+ def testEarliestOffset(): Unit = {
+ val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+ assertEquals(0, earliestOffset.offset())
+ }
+
+ @Test
+ def testLatestOffset(): Unit = {
+ val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+ assertEquals(3, latestOffset.offset())
+ }
+
+ @Test
+ def testMaxTimestampOffset(): Unit = {
+ val maxTimestampOffset = runFetchOffsets(adminClient,
OffsetSpec.maxTimestamp())
+ assertEquals(1, maxTimestampOffset.offset())
+ }
+
+ private def runFetchOffsets(adminClient: Admin,
+ offsetSpec: OffsetSpec):
ListOffsetsResult.ListOffsetsResultInfo = {
+ val tp = new TopicPartition(topicName, 0)
+ adminClient.listOffsets(Map(
+ tp -> offsetSpec
+ ).asJava, new ListOffsetsOptions()).all().get().get(tp)
+ }
+
+ def produceMessages(): Unit = {
+ val records = Seq(
+ new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
+ null, new Array[Byte](10000)),
+ new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
+ null, new Array[Byte](10000)),
+ new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
+ null, new Array[Byte](10000)),
+ )
+ TestUtils.produceMessages(servers, records, -1)
+ }
+
+ def generateConfigs: Seq[KafkaConfig] =
+ TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+}
+
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c730c47..0204b89 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -354,7 +354,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def createListOffsetsRequest = {
- requests.ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED).setTargetTimes(
+ requests.ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false).setTargetTimes(
List(new ListOffsetsTopic()
.setName(tp.topic)
.setPartitions(List(new ListOffsetsPartition()
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4a0674d..f515f13 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2059,6 +2059,35 @@ class LogTest {
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
}
+ @Test
+ def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = {
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200,
indexIntervalBytes = 1)
+ val log = createLog(logDir, logConfig)
+
+ assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+ val firstTimestamp = mockTime.milliseconds
+ val leaderEpoch = 0
+ log.appendAsLeader(TestUtils.singletonRecords(
+ value = TestUtils.randomBytes(10),
+ timestamp = firstTimestamp),
+ leaderEpoch = leaderEpoch)
+
+ val secondTimestamp = firstTimestamp + 1
+ log.appendAsLeader(TestUtils.singletonRecords(
+ value = TestUtils.randomBytes(10),
+ timestamp = secondTimestamp),
+ leaderEpoch = leaderEpoch)
+
+ log.appendAsLeader(TestUtils.singletonRecords(
+ value = TestUtils.randomBytes(10),
+ timestamp = firstTimestamp),
+ leaderEpoch = leaderEpoch)
+
+ assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L,
Optional.of(leaderEpoch))),
+ log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
+ }
+
/**
* Test the Log truncate operations
*/
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 399a7bd..f61688f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2080,7 +2080,7 @@ class KafkaApisTest {
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)
.setCurrentLeaderEpoch(currentLeaderEpoch.get)).asJava)).asJava
- val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel)
+ val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
val capturedResponse = expectNoThrottling(request)
@@ -3192,7 +3192,7 @@ class KafkaApisTest {
.setPartitions(List(new ListOffsetsPartition()
.setPartitionIndex(tp.partition)
.setTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)).asJava)).asJava
- val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel)
+ val listOffsetRequest = ListOffsetsRequest.Builder.forConsumer(true,
isolationLevel, false)
.setTargetTimes(targetTimes).build()
val request = buildRequest(listOffsetRequest)
val capturedResponse = expectNoThrottling(request)
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 6bbf0a0..1988ad6 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -43,7 +43,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setCurrentLeaderEpoch(0)).asJava)).asJava
val consumerRequest = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(targetTimes)
.build()
@@ -80,6 +80,18 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica,
debugReplicaRequest)
}
+ @Test
+ def testListOffsetsMaxTimeStampOldestVersion(): Unit = {
+ val consumerRequestBuilder = ListOffsetsRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
+
+ val maxTimestampRequestBuilder = ListOffsetsRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, true)
+
+ assertEquals(0.toShort, consumerRequestBuilder.oldestAllowedVersion())
+ assertEquals(7.toShort, maxTimestampRequestBuilder.oldestAllowedVersion())
+ }
+
def assertResponseErrorForEpoch(error: Errors, brokerId: Int,
currentLeaderEpoch: Optional[Integer]): Unit = {
val listOffsetPartition = new ListOffsetsPartition()
.setPartitionIndex(partition.partition)
@@ -90,7 +102,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setName(topic)
.setPartitions(List(listOffsetPartition).asJava)).asJava
val request = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(targetTimes)
.build()
assertResponseError(error, brokerId, request)
@@ -133,7 +145,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
.setTimestamp(timestamp)).asJava)).asJava
val builder = ListOffsetsRequest.Builder
- .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(targetTimes)
val request = if (version == -1) builder.build() else
builder.build(version)
@@ -162,11 +174,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val partitionToLeader = TestUtils.createTopic(zkClient, topic,
numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(partition.partition)
- TestUtils.generateAndProduceMessages(servers, topic, 10)
+ TestUtils.generateAndProduceMessages(servers, topic, 9)
+ TestUtils.produceMessage(servers, topic, "test-10",
System.currentTimeMillis() + 10L)
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, -1))
+ assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.MAX_TIMESTAMP, -1))
// Kill the first leader so that we can verify the epoch change when
fetching the latest offset
killBroker(firstLeaderId)
@@ -185,6 +199,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
// The latest offset reflects the updated epoch
assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, -1))
+ assertEquals((9L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId,
ListOffsetsRequest.MAX_TIMESTAMP, -1))
}
@Test
@@ -192,7 +207,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val partitionToLeader = TestUtils.createTopic(zkClient, topic,
numPartitions = 1, replicationFactor = 3, servers)
val firstLeaderId = partitionToLeader(partition.partition)
- TestUtils.generateAndProduceMessages(servers, topic, 10)
+ TestUtils.generateAndProduceMessages(servers, topic, 9)
+ TestUtils.produceMessage(servers, topic, "test-10",
System.currentTimeMillis() + 10L)
for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to
ApiKeys.LIST_OFFSETS.latestVersion) {
if (version == 0) {
@@ -203,10 +219,15 @@ class ListOffsetsRequestTest extends BaseRequestTest {
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
- } else if (version >= 4) {
+ } else if (version >= 4 && version <= 6) {
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
+ assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+ assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+ } else if (version >= 7) {
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L,
version.toShort))
assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
+ assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId,
ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index bf9ad3e..b742bfe 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -54,7 +54,7 @@ class LogOffsetTest extends BaseRequestTest {
@Test
def testGetOffsetsForUnknownTopic(): Unit = {
val topicPartition = new TopicPartition("foo", 0)
- val request = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED)
+ val request = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP, 10).asJava).build(0)
val response = sendListOffsetsRequest(request)
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
findPartition(response.topics.asScala, topicPartition).errorCode)
@@ -65,13 +65,7 @@ class LogOffsetTest extends BaseRequestTest {
def testGetOffsetsAfterDeleteRecords(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
-
- createTopic(topic, 1, 1)
-
- val logManager = server.getLogManager
- TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
- "Log for partition [topic,0] should be created")
- val log = logManager.getLog(topicPartition).get
+ val log = createTopicAndGetLog(topic, topicPartition)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -93,16 +87,51 @@ class LogOffsetTest extends BaseRequestTest {
}
@Test
- def testGetOffsetsBeforeLatestTime(): Unit = {
+ def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
val topic = "kafka-"
val topicPartition = new TopicPartition(topic, 0)
+ val log = createTopicAndGetLog(topic, topicPartition)
- createTopic(topic, 1, 1)
+ for (timestamp <- 0 until 20)
+ log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+ log.flush()
- val logManager = server.getLogManager
- TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
- s"Log for partition $topicPartition should be created")
- val log = logManager.getLog(topicPartition).get
+ log.updateHighWatermark(log.logEndOffset)
+
+ val firstOffset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+ assertEquals(19L, firstOffset.get.offset)
+ assertEquals(19L, firstOffset.get.timestamp)
+
+ log.truncateTo(0)
+
+ val secondOffset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+ assertEquals(0L, secondOffset.get.offset)
+ assertEquals(-1L, secondOffset.get.timestamp)
+ }
+
+ @Test
+ def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit
= {
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, 0)
+ val log = createTopicAndGetLog(topic, topicPartition)
+
+ for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
+ log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0)
+ log.flush()
+
+ log.updateHighWatermark(log.logEndOffset)
+
+ val maxTimestampOffset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+ assertEquals(7L, log.logEndOffset)
+ assertEquals(5L, maxTimestampOffset.get.offset)
+ assertEquals(6L, maxTimestampOffset.get.timestamp)
+ }
+
+ @Test
+ def testGetOffsetsBeforeLatestTime(): Unit = {
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, 0)
+ val log = createTopicAndGetLog(topic, topicPartition)
for (_ <- 0 until 20)
log.appendAsLeader(TestUtils.singletonRecords(value =
Integer.toString(42).getBytes()), leaderEpoch = 0)
@@ -111,7 +140,7 @@ class LogOffsetTest extends BaseRequestTest {
val offsets =
log.legacyFetchOffsetsBefore(ListOffsetsRequest.LATEST_TIMESTAMP, 15)
assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L),
offsets)
- TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic,
topicPartition.partition, server),
+ TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, 0,
server),
"Leader should be elected")
val request = ListOffsetsRequest.Builder.forReplica(0, 0)
.setTargetTimes(buildTargetTimes(topicPartition,
ListOffsetsRequest.LATEST_TIMESTAMP, 15).asJava).build()
@@ -149,6 +178,20 @@ class LogOffsetTest extends BaseRequestTest {
assertFalse(offsetChanged)
}
+ @Test
+ def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+ val topic = "kafka-"
+ val topicPartition = new TopicPartition(topic, 0)
+ val log = createTopicAndGetLog(topic, topicPartition)
+
+ log.updateHighWatermark(log.logEndOffset)
+
+ val maxTimestampOffset =
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+ assertEquals(0L, log.logEndOffset)
+ assertEquals(0L, maxTimestampOffset.get.offset)
+ assertEquals(-1L, maxTimestampOffset.get.timestamp)
+ }
+
@deprecated("legacyFetchOffsetsBefore", since = "")
@Test
def testGetOffsetsBeforeNow(): Unit = {
@@ -266,4 +309,13 @@ class LogOffsetTest extends BaseRequestTest {
.partitions.asScala.find(_.partitionIndex == tp.partition).get
}
+ private def createTopicAndGetLog(topic: String, topicPartition:
TopicPartition): Log = {
+ createTopic(topic, 1, 1)
+
+ val logManager = server.getLogManager
+ TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+ "Log for partition [topic,0] should be created")
+ logManager.getLog(topicPartition).get
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 757b82c..1f04f08 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -240,7 +240,7 @@ class RequestQuotaTest extends BaseRequestTest {
.setPartitionIndex(tp.partition)
.setTimestamp(0L)
.setCurrentLeaderEpoch(15)).asJava)
- ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED)
+ ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false)
.setTargetTimes(List(topic).asJava)
case ApiKeys.LEADER_AND_ISR =>
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 53bc88e..0802d87 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1207,17 +1207,18 @@ object TestUtils extends Logging {
values
}
- def produceMessage(servers: Seq[KafkaServer], topic: String, message: String,
+ def produceMessage(servers: Seq[KafkaServer], topic: String, message:
String, timestamp: java.lang.Long = null,
deliveryTimeoutMs: Int = 30 * 1000, requestTimeoutMs: Int
= 20 * 1000): Unit = {
val producer =
createProducer(TestUtils.getBrokerListStrFromServers(servers),
deliveryTimeoutMs = deliveryTimeoutMs, requestTimeoutMs =
requestTimeoutMs)
try {
- producer.send(new ProducerRecord(topic, topic.getBytes,
message.getBytes)).get
+ producer.send(new ProducerRecord(topic, null, timestamp, topic.getBytes,
message.getBytes)).get
} finally {
producer.close()
}
}
+
def verifyTopicDeletion(zkClient: KafkaZkClient, topic: String,
numPartitions: Int, servers: Seq[KafkaServer]): Unit = {
val topicPartitions = (0 until numPartitions).map(new
TopicPartition(topic, _))
// wait until admin path for delete topic is deleted, signaling completion
of topic deletion
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
index 326916e..e6fc2dc 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
@@ -69,7 +69,7 @@ public class ListOffsetRequestBenchmark {
}
}
- this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED)
+ this.offsetRequest = ListOffsetsRequest.Builder.forConsumer(false,
IsolationLevel.READ_UNCOMMITTED, false)
.build(ApiKeys.LIST_OFFSETS.latestVersion());
}