This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f4c25fe1d8 KAFKA-17448: New consumer seek should update positions in
background thread (#17075)
3f4c25fe1d8 is described below
commit 3f4c25fe1d800abafa9df81ccaa42b1b3921c824
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Sep 12 02:08:33 2024 +0800
KAFKA-17448: New consumer seek should update positions in background thread
(#17075)
Reviewers: Lianet Magrans <[email protected]>, Kirk True
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 22 ++++----
.../internals/events/ApplicationEvent.java | 3 +-
.../events/ApplicationEventProcessor.java | 18 +++++++
.../internals/events/SeekUnvalidatedEvent.java | 59 ++++++++++++++++++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 25 +++++++++
.../events/ApplicationEventProcessorTest.java | 33 ++++++++++++
6 files changed, 147 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e988afa10c7..06ee3bc3616 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -56,6 +55,7 @@ import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
@@ -790,11 +790,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
log.info("Seeking to offset {} for partition {}", offset,
partition);
- SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
- offset,
- Optional.empty(), // This will ensure we skip validation
- metadata.currentLeader(partition));
- subscriptions.seekUnvalidated(partition, newPosition);
+ Timer timer = time.timer(defaultApiTimeoutMs);
+ SeekUnvalidatedEvent seekUnvalidatedEventEvent = new
SeekUnvalidatedEvent(
+ calculateDeadlineMs(timer), partition, offset,
Optional.empty());
+ applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
} finally {
release();
}
@@ -815,13 +814,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
} else {
log.info("Seeking to offset {} for partition {}", offset,
partition);
}
- Metadata.LeaderAndEpoch currentLeaderAndEpoch =
metadata.currentLeader(partition);
- SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
- offsetAndMetadata.offset(),
- offsetAndMetadata.leaderEpoch(),
- currentLeaderAndEpoch);
updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
- subscriptions.seekUnvalidated(partition, newPosition);
+
+ Timer timer = time.timer(defaultApiTimeoutMs);
+ SeekUnvalidatedEvent seekUnvalidatedEventEvent = new
SeekUnvalidatedEvent(
+ calculateDeadlineMs(timer), partition,
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch());
+ applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
} finally {
release();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index a31f458e5e0..4b0584b5bb8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -35,7 +35,8 @@ public abstract class ApplicationEvent {
COMMIT_ON_CLOSE,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
- SHARE_ACKNOWLEDGE_ON_CLOSE
+ SHARE_ACKNOWLEDGE_ON_CLOSE,
+ SEEK_UNVALIDATED,
}
private final Type type;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index b28750ac7fb..6ce8737c78a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -147,6 +147,10 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
process((ShareAcknowledgeOnCloseEvent) event);
return;
+ case SEEK_UNVALIDATED:
+ process((SeekUnvalidatedEvent) event);
+ return;
+
default:
log.warn("Application event type {} was not expected",
event.type());
}
@@ -409,4 +413,18 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
};
}
+
+ private void process(final SeekUnvalidatedEvent event) {
+ try {
+ SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
+ event.offset(),
+ event.offsetEpoch(),
+ metadata.currentLeader(event.partition())
+ );
+ subscriptions.seekUnvalidated(event.partition(), newPosition);
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java
new file mode 100644
index 00000000000..d3f4cedff7f
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Optional;
+
+/**
+ * Event to perform {@link SubscriptionState#seekUnvalidated(TopicPartition,
SubscriptionState.FetchPosition)}
+ * in the background thread. This can avoid race conditions when subscription
state is updated.
+ */
+public class SeekUnvalidatedEvent extends CompletableApplicationEvent<Void> {
+ private final TopicPartition partition;
+ private final long offset;
+ private final Optional<Integer> offsetEpoch;
+
+ public SeekUnvalidatedEvent(long deadlineMs, TopicPartition partition,
long offset, Optional<Integer> offsetEpoch) {
+ super(Type.SEEK_UNVALIDATED, deadlineMs);
+ this.partition = partition;
+ this.offset = offset;
+ this.offsetEpoch = offsetEpoch;
+ }
+
+ public TopicPartition partition() {
+ return partition;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public Optional<Integer> offsetEpoch() {
+ return offsetEpoch;
+ }
+
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase()
+ + ", partition=" + partition
+ + ", offset=" + offset
+ + offsetEpoch.map(integer -> ", offsetEpoch=" +
integer).orElse("");
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index ae694c12a09..2526b6447a4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -45,6 +45,7 @@ import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
@@ -769,6 +770,7 @@ public class AsyncKafkaConsumerTest {
new Node(1, "host", 9000)), Optional.of(1)));
completeAssignmentChangeEventSuccessfully();
consumer.assign(Arrays.asList(t0, t1));
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(t0, 10);
consumer.seek(t1, 20);
@@ -804,6 +806,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
@@ -827,6 +830,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
@@ -894,6 +898,7 @@ public class AsyncKafkaConsumerTest {
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
consumer.commitAsync();
@@ -925,6 +930,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
assertDoesNotThrow(() -> consumer.commitAsync());
@@ -1018,6 +1024,7 @@ public class AsyncKafkaConsumerTest {
"client-id");
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
+ completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.commitSyncAllConsumed(time.timer(100));
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
@@ -1035,6 +1042,7 @@ public class AsyncKafkaConsumerTest {
"client-id");
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
+ completeSeekUnvalidatedEventSuccessfully();
subscriptions.seek(new TopicPartition("topic", 0), 100);
verify(applicationEventHandler,
never()).add(any(SyncCommitEvent.class));
}
@@ -1293,6 +1301,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 10);
consumer.wakeup();
@@ -1322,6 +1331,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
consumer.commitAsync();
@@ -1340,6 +1350,7 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("foo", 0);
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(tp));
+ completeSeekUnvalidatedEventSuccessfully();
consumer.seek(tp, 20);
completeCommitAsyncApplicationEventSuccessfully();
consumer.commitAsync(cb);
@@ -2187,6 +2198,20 @@ public class AsyncKafkaConsumerTest {
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
}
+ private void completeSeekUnvalidatedEventSuccessfully() {
+ doAnswer(invocation -> {
+ SeekUnvalidatedEvent event = invocation.getArgument(0);
+ SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
+ event.offset(),
+ event.offsetEpoch(),
+ metadata.currentLeader(event.partition())
+ );
+ consumer.subscriptions().seekUnvalidated(event.partition(),
newPosition);
+ event.future().complete(null);
+ return null;
+
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class));
+ }
+
private void forceCommitCallbackInvocation() {
// Invokes callback
consumer.commitAsync();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index df90c43bf4a..84a7ac84d1c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
@@ -53,6 +54,8 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -183,6 +186,36 @@ public class ApplicationEventProcessorTest {
assertInstanceOf(IllegalStateException.class, e.getCause());
}
+ @Test
+ public void testSeekUnvalidatedEvent() {
+ TopicPartition tp = new TopicPartition("topic", 0);
+ SubscriptionState.FetchPosition position = new
SubscriptionState.FetchPosition(
+ 0, Optional.empty(),
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+ SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0,
Optional.empty());
+
+ setupProcessor(false);
+
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
+ doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any());
+ processor.process(event);
+ verify(metadata).currentLeader(tp);
+ verify(subscriptionState).seekUnvalidated(tp, position);
+ assertDoesNotThrow(() -> event.future().get());
+ }
+
+ @Test
+ public void testSeekUnvalidatedEventWithException() {
+ TopicPartition tp = new TopicPartition("topic", 0);
+ SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0,
Optional.empty());
+
+ setupProcessor(false);
+
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
+ doThrow(new
IllegalStateException()).when(subscriptionState).seekUnvalidated(eq(tp), any());
+ processor.process(event);
+
+ ExecutionException e = assertThrows(ExecutionException.class, () ->
event.future().get());
+ assertInstanceOf(IllegalStateException.class, e.getCause());
+ }
+
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}