This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new c7f75ad KAFKA-13008: Try to refresh end offset when partitionLag
returns empty (#11057)
c7f75ad is described below
commit c7f75add6f3f71dc5cd00043136bfd13e2b0fd5d
Author: Guozhang Wang <[email protected]>
AuthorDate: Fri Jul 23 16:46:10 2021 -0700
KAFKA-13008: Try to refresh end offset when partitionLag returns empty
(#11057)
1. When listOffset result is retrieved inside Fetcher, check if the
partitions are part of the subscriptions of the consumer; if yes update the
corresponding LSO or HW based on the isolation level.
2. When partitionLag cannot return result since the log end offset (LSO/HW)
is not known, send an async list offset which would be completed by other calls
polling (also the hb thread may complete it as well), and hope the next
partitionLag would get the result.
3. Keep track of list-offset request sent at the subscription state level
so that frequent currentLag calls would not cause excessive list-offset
requests.
Then on the streams side, the first partitionLag would still return empty,
but soon enough the subsequent partitionLag should return data and we would not
wait for the fetch response to update fetched state.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, John Roesler
<[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 19 ++++++-
.../kafka/clients/consumer/internals/Fetcher.java | 59 +++++++++++++++++-----
.../consumer/internals/SubscriptionState.java | 33 +++++++++++-
.../kafka/clients/consumer/KafkaConsumerTest.java | 50 +++++++++++++++++-
.../clients/consumer/internals/FetcherTest.java | 17 ++++++-
5 files changed, 162 insertions(+), 16 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 992edc6..29a9e37 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2237,7 +2237,24 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
acquireAndEnsureOpen();
try {
final Long lag = subscriptions.partitionLag(topicPartition,
isolationLevel);
- return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
+
+ // if the log end offset is not known and hence cannot return lag
and there is
+ // no in-flight list offset requested yet,
+ // issue a list offset request for that partition so that next time
+ // we may get the answer; we do not need to wait for the return
value
+ // since we would not try to poll the network client synchronously
+ if (lag == null) {
+ if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null &&
+
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
+ log.info("Requesting the log end offset for {} in order to
compute lag", topicPartition);
+ subscriptions.requestPartitionEndOffset(topicPartition);
+ fetcher.endOffsets(Collections.singleton(topicPartition),
time.timer(0L));
+ }
+
+ return OptionalLong.empty();
+ }
+
+ return OptionalLong.of(lag);
} finally {
release();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f4eb0d8..5c5287c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -161,7 +161,6 @@ public class Fetcher<K, V> implements Closeable {
private final ApiVersions apiVersions;
private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
-
private CompletedFetch nextInLineFetch = null;
public Fetcher(LogContext logContext,
@@ -540,19 +539,54 @@ public class Fetcher<K, V> implements Closeable {
Map<TopicPartition, Long> remainingToSearch = new
HashMap<>(timestampsToSearch);
do {
RequestFuture<ListOffsetResult> future =
sendListOffsetsRequests(remainingToSearch, requireTimestamps);
+
+ future.addListener(new RequestFutureListener<ListOffsetResult>() {
+ @Override
+ public void onSuccess(ListOffsetResult value) {
+ synchronized (future) {
+ result.fetchedOffsets.putAll(value.fetchedOffsets);
+
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
+
+ for (final Map.Entry<TopicPartition, ListOffsetData>
entry: value.fetchedOffsets.entrySet()) {
+ final TopicPartition partition = entry.getKey();
+
+ // if the interested partitions are part of the
subscriptions, use the returned offset to update
+ // the subscription state as well:
+ // * with read-committed, the returned offset
would be LSO;
+ // * with read-uncommitted, the returned offset
would be HW;
+ if (subscriptions.isAssigned(partition)) {
+ final long offset = entry.getValue().offset;
+ if (isolationLevel ==
IsolationLevel.READ_COMMITTED) {
+ log.trace("Updating last stable offset for
partition {} to {}", partition, offset);
+
subscriptions.updateLastStableOffset(partition, offset);
+ } else {
+ log.trace("Updating high watermark for
partition {} to {}", partition, offset);
+
subscriptions.updateHighWatermark(partition, offset);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ if (!(e instanceof RetriableException)) {
+ throw future.exception();
+ }
+ }
+ });
+
+ // if timeout is set to zero, do not try to poll the network
client at all
+ // and return empty immediately; otherwise try to get the results
synchronously
+ // and throw timeout exception if cannot complete in time
+ if (timer.timeoutMs() == 0L)
+ return result;
+
client.poll(future, timer);
if (!future.isDone()) {
break;
- } else if (future.succeeded()) {
- ListOffsetResult value = future.value();
- result.fetchedOffsets.putAll(value.fetchedOffsets);
- remainingToSearch.keySet().retainAll(value.partitionsToRetry);
- } else if (!future.isRetriable()) {
- throw future.exception();
- }
-
- if (remainingToSearch.isEmpty()) {
+ } else if (remainingToSearch.isEmpty()) {
return result;
} else {
client.awaitMetadataUpdate(timer);
@@ -894,8 +928,9 @@ public class Fetcher<K, V> implements Closeable {
final AtomicInteger remainingResponses = new
AtomicInteger(timestampsToSearchByNode.size());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry
: timestampsToSearchByNode.entrySet()) {
- RequestFuture<ListOffsetResult> future =
- sendListOffsetRequest(entry.getKey(), entry.getValue(),
requireTimestamps);
+ // we skip sending the list off request only if there's already
one with the exact
+ // requested offsets for the destination node
+ RequestFuture<ListOffsetResult> future =
sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
future.addListener(new RequestFutureListener<ListOffsetResult>() {
@Override
public void onSuccess(ListOffsetResult partialResult) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 3e53868..2dd587f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -548,6 +548,25 @@ public class SubscriptionState {
}
}
+ public synchronized Long partitionEndOffset(TopicPartition tp,
IsolationLevel isolationLevel) {
+ TopicPartitionState topicPartitionState = assignedState(tp);
+ if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+ return topicPartitionState.lastStableOffset;
+ } else {
+ return topicPartitionState.highWatermark;
+ }
+ }
+
+ public synchronized void requestPartitionEndOffset(TopicPartition tp) {
+ TopicPartitionState topicPartitionState = assignedState(tp);
+ topicPartitionState.requestEndOffset();
+ }
+
+ public synchronized boolean partitionEndOffsetRequested(TopicPartition tp)
{
+ TopicPartitionState topicPartitionState = assignedState(tp);
+ return topicPartitionState.endOffsetRequested();
+ }
+
synchronized Long partitionLead(TopicPartition tp) {
TopicPartitionState topicPartitionState = assignedState(tp);
return topicPartitionState.logStartOffset == null ? null :
topicPartitionState.position.offset - topicPartitionState.logStartOffset;
@@ -753,9 +772,11 @@ public class SubscriptionState {
private Long nextRetryTimeMs;
private Integer preferredReadReplica;
private Long preferredReadReplicaExpireTimeMs;
-
+ private boolean endOffsetRequested;
+
TopicPartitionState() {
this.paused = false;
+ this.endOffsetRequested = false;
this.fetchState = FetchStates.INITIALIZING;
this.position = null;
this.highWatermark = null;
@@ -766,6 +787,14 @@ public class SubscriptionState {
this.preferredReadReplica = null;
}
+ public boolean endOffsetRequested() {
+ return endOffsetRequested;
+ }
+
+ public void requestEndOffset() {
+ endOffsetRequested = true;
+ }
+
private void transitionState(FetchState newState, Runnable
runIfTransitioned) {
FetchState nextState = this.fetchState.transitionTo(newState);
if (nextState.equals(newState)) {
@@ -946,6 +975,7 @@ public class SubscriptionState {
private void highWatermark(Long highWatermark) {
this.highWatermark = highWatermark;
+ this.endOffsetRequested = false;
}
private void logStartOffset(Long logStartOffset) {
@@ -954,6 +984,7 @@ public class SubscriptionState {
private void lastStableOffset(Long lastStableOffset) {
this.lastStableOffset = lastStableOffset;
+ this.endOffsetRequested = false;
}
private OffsetResetStrategy resetStrategy() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ab8a950..326eaa4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2156,16 +2156,35 @@ public class KafkaConsumerTest {
consumer.assign(singleton(tp0));
+ // poll once to update with the current metadata
+ consumer.poll(Duration.ofMillis(0));
+ client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, metadata.fetch().nodes().get(0)));
+
// no error for no current position
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+ assertEquals(0, client.inFlightRequestCount());
+ // poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
+ consumer.poll(Duration.ofMillis(0));
+ // requests: list-offset, fetch
+ assertEquals(2, client.inFlightRequestCount());
// no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
+ // poll once again, which should return the list-offset response
+ // and hence next call would return correct lag result
+ client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ consumer.poll(Duration.ofMillis(0));
+
+ assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+ // requests: fetch
+ assertEquals(1, client.inFlightRequestCount());
+
+ // one successful fetch should update the log end offset and the
position
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
- client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo)));
+ client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
final ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count());
@@ -2177,6 +2196,35 @@ public class KafkaConsumerTest {
consumer.close(Duration.ZERO);
}
+ @Test
+ public void testListOffsetShouldUpateSubscriptions() {
+ final Time time = new MockTime();
+ final SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
+ final ConsumerMetadata metadata = createMetadata(subscription);
+ final MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, singletonMap(topic, 1));
+ final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
+
+ final KafkaConsumer<String, String> consumer =
+ newConsumer(time, client, subscription, metadata, assignor,
true, groupInstanceId);
+
+ consumer.assign(singleton(tp0));
+
+ // poll once to update with the current metadata
+ consumer.poll(Duration.ofMillis(0));
+ client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, metadata.fetch().nodes().get(0)));
+
+ consumer.seek(tp0, 50L);
+ client.prepareResponse(listOffsetsResponse(singletonMap(tp0, 90L)));
+
+ assertEquals(singletonMap(tp0, 90L),
consumer.endOffsets(Collections.singleton(tp0)));
+ // correct lag result should be returned as well
+ assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+
+ consumer.close(Duration.ZERO);
+ }
+
private KafkaConsumer<String, String>
consumerWithPendingAuthenticationError() {
Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 03a2279..e3abe56 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2771,6 +2771,21 @@ public class FetcherTest {
}
@Test
+ public void testListOffsetsWithZeroTimeout() {
+ buildFetcher();
+
+ Map<TopicPartition, Long> offsetsToSearch = new HashMap<>();
+ offsetsToSearch.put(tp0, ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ offsetsToSearch.put(tp1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
+
+ Map<TopicPartition, Long> offsetsToExpect = new HashMap<>();
+ offsetsToExpect.put(tp0, null);
+ offsetsToExpect.put(tp1, null);
+
+ assertEquals(offsetsToExpect, fetcher.offsetsForTimes(offsetsToSearch,
time.timer(0)));
+ }
+
+ @Test
public void testBatchedListOffsetsMetadataErrors() {
buildFetcher();
@@ -2795,7 +2810,7 @@ public class FetcherTest {
offsetsToSearch.put(tp0, ListOffsetsRequest.EARLIEST_TIMESTAMP);
offsetsToSearch.put(tp1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
- assertThrows(TimeoutException.class, () ->
fetcher.offsetsForTimes(offsetsToSearch, time.timer(0)));
+ assertThrows(TimeoutException.class, () ->
fetcher.offsetsForTimes(offsetsToSearch, time.timer(1)));
}
@Test