This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 650d66c356f [fix][client] thread-safe seek
650d66c356f is described below
commit 650d66c356f7d1addf1048051c4aa9ad1fa7eae0
Author: tison <[email protected]>
AuthorDate: Mon May 15 15:10:11 2023 +0800
[fix][client] thread-safe seek
Signed-off-by: tison <[email protected]>
(cherry picked from commit 0acf8f89d6057170e990b128936175fc5dd33be3)
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 ++++++++-
.../pulsar/client/impl/ConsumerImplTest.java | 27 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 199e8a9ae71..2dd245c10f5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2151,9 +2151,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
+ if (!duringSeek.compareAndSet(false, true)) {
+ final String message = String.format(
+ "[%s][%s] attempting to seek operation that is already in
progress (seek by %s)",
+ topic, subscription, seekBy);
+ log.warn("[{}][{}] Attempting to seek operation that is already in
progress, cancelling {}",
+ topic, subscription, seekBy);
+ seekFuture.completeExceptionally(new
IllegalStateException(message));
+ return seekFuture;
+ }
+
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
- duringSeek.set(true);
log.info("[{}][{}] Seeking subscription to {}", topic, subscription,
seekBy);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 29d180f5f9a..5a223d5da15 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -26,7 +27,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
@@ -259,4 +262,26 @@ public class ConsumerImplTest {
assertThat(consumer.getPriorityLevel()).isEqualTo(1);
}
+
+ @Test(invocationTimeOut = 1000)
+ public void testSeekAsyncInternal() {
+ // given
+ ClientCnx cnx = mock(ClientCnx.class);
+ CompletableFuture<ProducerResponse> clientReq = new
CompletableFuture<>();
+ when(cnx.sendRequestWithId(any(ByteBuf.class),
anyLong())).thenReturn(clientReq);
+
+ consumer.setClientCnx(cnx);
+ consumer.setState(HandlerState.State.Ready);
+
+ // when
+ CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
+ CompletableFuture<Void> secondResult = consumer.seekAsync(1L);
+
+ clientReq.complete(null);
+
+ // then
+ assertTrue(firstResult.isDone());
+ assertTrue(secondResult.isCompletedExceptionally());
+ verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
+ }
}