This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 650d66c356f [fix][client] thread-safe seek
650d66c356f is described below

commit 650d66c356f7d1addf1048051c4aa9ad1fa7eae0
Author: tison <[email protected]>
AuthorDate: Mon May 15 15:10:11 2023 +0800

    [fix][client] thread-safe seek
    
    Signed-off-by: tison <[email protected]>
    (cherry picked from commit 0acf8f89d6057170e990b128936175fc5dd33be3)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 ++++++++-
 .../pulsar/client/impl/ConsumerImplTest.java       | 27 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 199e8a9ae71..2dd245c10f5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2151,9 +2151,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
 
+        if (!duringSeek.compareAndSet(false, true)) {
+            final String message = String.format(
+                    "[%s][%s] attempting to seek operation that is already in 
progress (seek by %s)",
+                    topic, subscription, seekBy);
+            log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}",
+                    topic, subscription, seekBy);
+            seekFuture.completeExceptionally(new 
IllegalStateException(message));
+            return seekFuture;
+        }
+
         MessageIdAdv originSeekMessageId = seekMessageId;
         seekMessageId = (MessageIdAdv) seekId;
-        duringSeek.set(true);
         log.info("[{}][{}] Seeking subscription to {}", topic, subscription, 
seekBy);
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 29d180f5f9a..5a223d5da15 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -26,7 +27,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
@@ -259,4 +262,26 @@ public class ConsumerImplTest {
 
         assertThat(consumer.getPriorityLevel()).isEqualTo(1);
     }
+
+    @Test(invocationTimeOut = 1000)
+    public void testSeekAsyncInternal() {
+        // given
+        ClientCnx cnx = mock(ClientCnx.class);
+        CompletableFuture<ProducerResponse> clientReq = new 
CompletableFuture<>();
+        when(cnx.sendRequestWithId(any(ByteBuf.class), 
anyLong())).thenReturn(clientReq);
+
+        consumer.setClientCnx(cnx);
+        consumer.setState(HandlerState.State.Ready);
+
+        // when
+        CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
+        CompletableFuture<Void> secondResult = consumer.seekAsync(1L);
+
+        clientReq.complete(null);
+
+        // then
+        assertTrue(firstResult.isDone());
+        assertTrue(secondResult.isCompletedExceptionally());
+        verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
+    }
 }

Reply via email to