This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f4c25fe1d8 KAFKA-17448: New consumer seek should update positions in 
background thread (#17075)
3f4c25fe1d8 is described below

commit 3f4c25fe1d800abafa9df81ccaa42b1b3921c824
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Sep 12 02:08:33 2024 +0800

    KAFKA-17448: New consumer seek should update positions in background thread 
(#17075)
    
    Reviewers: Lianet Magrans <[email protected]>, Kirk True 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 22 ++++----
 .../internals/events/ApplicationEvent.java         |  3 +-
 .../events/ApplicationEventProcessor.java          | 18 +++++++
 .../internals/events/SeekUnvalidatedEvent.java     | 59 ++++++++++++++++++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 25 +++++++++
 .../events/ApplicationEventProcessorTest.java      | 33 ++++++++++++
 6 files changed, 147 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e988afa10c7..06ee3bc3616 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.KafkaClient;
-import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -56,6 +55,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
@@ -790,11 +790,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             log.info("Seeking to offset {} for partition {}", offset, 
partition);
-            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
-                offset,
-                Optional.empty(), // This will ensure we skip validation
-                metadata.currentLeader(partition));
-            subscriptions.seekUnvalidated(partition, newPosition);
+            Timer timer = time.timer(defaultApiTimeoutMs);
+            SeekUnvalidatedEvent seekUnvalidatedEventEvent = new 
SeekUnvalidatedEvent(
+                    calculateDeadlineMs(timer), partition, offset, 
Optional.empty());
+            applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
         } finally {
             release();
         }
@@ -815,13 +814,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             } else {
                 log.info("Seeking to offset {} for partition {}", offset, 
partition);
             }
-            Metadata.LeaderAndEpoch currentLeaderAndEpoch = 
metadata.currentLeader(partition);
-            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
-                offsetAndMetadata.offset(),
-                offsetAndMetadata.leaderEpoch(),
-                currentLeaderAndEpoch);
             updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
-            subscriptions.seekUnvalidated(partition, newPosition);
+
+            Timer timer = time.timer(defaultApiTimeoutMs);
+            SeekUnvalidatedEvent seekUnvalidatedEventEvent = new 
SeekUnvalidatedEvent(
+                    calculateDeadlineMs(timer), partition, 
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch());
+            applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
         } finally {
             release();
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index a31f458e5e0..4b0584b5bb8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -35,7 +35,8 @@ public abstract class ApplicationEvent {
         COMMIT_ON_CLOSE,
         SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
         SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
-        SHARE_ACKNOWLEDGE_ON_CLOSE
+        SHARE_ACKNOWLEDGE_ON_CLOSE,
+        SEEK_UNVALIDATED,
     }
 
     private final Type type;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index b28750ac7fb..6ce8737c78a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -147,6 +147,10 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((ShareAcknowledgeOnCloseEvent) event);
                 return;
 
+            case SEEK_UNVALIDATED:
+                process((SeekUnvalidatedEvent) event);
+                return;
+
             default:
                 log.warn("Application event type {} was not expected", 
event.type());
         }
@@ -409,4 +413,18 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
             }
         };
     }
+
+    private void process(final SeekUnvalidatedEvent event) {
+        try {
+            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
+                    event.offset(),
+                    event.offsetEpoch(),
+                    metadata.currentLeader(event.partition())
+            );
+            subscriptions.seekUnvalidated(event.partition(), newPosition);
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java
new file mode 100644
index 00000000000..d3f4cedff7f
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Optional;
+
+/**
+ * Event to perform {@link SubscriptionState#seekUnvalidated(TopicPartition, 
SubscriptionState.FetchPosition)}
+ * in the background thread. This can avoid race conditions when subscription 
state is updated.
+ */
+public class SeekUnvalidatedEvent extends CompletableApplicationEvent<Void> {
+    private final TopicPartition partition;
+    private final long offset;
+    private final Optional<Integer> offsetEpoch;
+
+    public SeekUnvalidatedEvent(long deadlineMs, TopicPartition partition, 
long offset, Optional<Integer> offsetEpoch) {
+        super(Type.SEEK_UNVALIDATED, deadlineMs);
+        this.partition = partition;
+        this.offset = offset;
+        this.offsetEpoch = offsetEpoch;
+    }
+
+    public TopicPartition partition() {
+        return partition;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public Optional<Integer> offsetEpoch() {
+        return offsetEpoch;
+    }
+
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase()
+                + ", partition=" + partition
+                + ", offset=" + offset
+                + offsetEpoch.map(integer -> ", offsetEpoch=" + 
integer).orElse("");
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index ae694c12a09..2526b6447a4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -45,6 +45,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
@@ -769,6 +770,7 @@ public class AsyncKafkaConsumerTest {
                 new Node(1, "host", 9000)), Optional.of(1)));
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Arrays.asList(t0, t1));
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(t0, 10);
         consumer.seek(t1, 20);
 
@@ -804,6 +806,7 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition("foo", 0);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
 
         assertDoesNotThrow(() -> consumer.commitAsync());
@@ -827,6 +830,7 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition("foo", 0);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
 
         assertDoesNotThrow(() -> consumer.commitAsync());
@@ -894,6 +898,7 @@ public class AsyncKafkaConsumerTest {
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
         consumer.commitAsync();
 
@@ -925,6 +930,7 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition("foo", 0);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
 
         assertDoesNotThrow(() -> consumer.commitAsync());
@@ -1018,6 +1024,7 @@ public class AsyncKafkaConsumerTest {
             "client-id");
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
+        completeSeekUnvalidatedEventSuccessfully();
         subscriptions.seek(new TopicPartition("topic", 0), 100);
         consumer.commitSyncAllConsumed(time.timer(100));
         verify(applicationEventHandler).add(any(SyncCommitEvent.class));
@@ -1035,6 +1042,7 @@ public class AsyncKafkaConsumerTest {
             "client-id");
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
+        completeSeekUnvalidatedEventSuccessfully();
         subscriptions.seek(new TopicPartition("topic", 0), 100);
         verify(applicationEventHandler, 
never()).add(any(SyncCommitEvent.class));
     }
@@ -1293,6 +1301,7 @@ public class AsyncKafkaConsumerTest {
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
         
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 10);
         consumer.wakeup();
 
@@ -1322,6 +1331,7 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition("foo", 0);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
 
         consumer.commitAsync();
@@ -1340,6 +1350,7 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition("foo", 0);
         completeAssignmentChangeEventSuccessfully();
         consumer.assign(Collections.singleton(tp));
+        completeSeekUnvalidatedEventSuccessfully();
         consumer.seek(tp, 20);
         completeCommitAsyncApplicationEventSuccessfully();
         consumer.commitAsync(cb);
@@ -2187,6 +2198,20 @@ public class AsyncKafkaConsumerTest {
         
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
     }
 
+    private void completeSeekUnvalidatedEventSuccessfully() {
+        doAnswer(invocation -> {
+            SeekUnvalidatedEvent event = invocation.getArgument(0);
+            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
+                    event.offset(),
+                    event.offsetEpoch(),
+                    metadata.currentLeader(event.partition())
+            );
+            consumer.subscriptions().seekUnvalidated(event.partition(), 
newPosition);
+            event.future().complete(null);
+            return null;
+        
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class));
+    }
+
     private void forceCommitCallbackInvocation() {
         // Invokes callback
         consumer.commitAsync();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index df90c43bf4a..84a7ac84d1c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import 
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
@@ -53,6 +54,8 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -183,6 +186,36 @@ public class ApplicationEventProcessorTest {
         assertInstanceOf(IllegalStateException.class, e.getCause());
     }
 
+    @Test
+    public void testSeekUnvalidatedEvent() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        SubscriptionState.FetchPosition position = new 
SubscriptionState.FetchPosition(
+                0, Optional.empty(), 
Metadata.LeaderAndEpoch.noLeaderOrEpoch());
+        SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, 
Optional.empty());
+
+        setupProcessor(false);
+        
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
+        doNothing().when(subscriptionState).seekUnvalidated(eq(tp), any());
+        processor.process(event);
+        verify(metadata).currentLeader(tp);
+        verify(subscriptionState).seekUnvalidated(tp, position);
+        assertDoesNotThrow(() -> event.future().get());
+    }
+
+    @Test
+    public void testSeekUnvalidatedEventWithException() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        SeekUnvalidatedEvent event = new SeekUnvalidatedEvent(12345, tp, 0, 
Optional.empty());
+
+        setupProcessor(false);
+        
doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(tp);
+        doThrow(new 
IllegalStateException()).when(subscriptionState).seekUnvalidated(eq(tp), any());
+        processor.process(event);
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
event.future().get());
+        assertInstanceOf(IllegalStateException.class, e.getCause());
+    }
+
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
         return 
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
     }

Reply via email to