This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bc1764f9ef7 [fix][client] Seek should be thread-safe (#20242)
bc1764f9ef7 is described below

commit bc1764f9ef71dd31e8cd61c7571e493442bc6395
Author: Kim, Joo Hyuk <[email protected]>
AuthorDate: Mon May 8 20:57:20 2023 +0900

    [fix][client] Seek should be thread-safe (#20242)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 109 ++++++++++++---------
 .../pulsar/client/impl/ConsumerImplTest.java       |  27 ++++-
 2 files changed, 86 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 199e8a9ae71..6c64fe04069 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -208,6 +208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
+
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
@@ -251,10 +252,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-           ExecutorProvider executorProvider, int partitionIndex, boolean 
hasParentConsumer,
-           boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> 
subscribeFuture, MessageId startMessageId,
-           long startMessageRollbackDurationInSec, Schema<T> schema, 
ConsumerInterceptors<T> interceptors,
-           boolean createTopicIfDoesNotExist) {
+                           ExecutorProvider executorProvider, int 
partitionIndex, boolean hasParentConsumer,
+                           boolean parentConsumerHasListener, 
CompletableFuture<Consumer<T>> subscribeFuture,
+                           MessageId startMessageId,
+                           long startMessageRollbackDurationInSec, Schema<T> 
schema,
+                           ConsumerInterceptors<T> interceptors,
+                           boolean createTopicIfDoesNotExist) {
         super(client, topic, conf, conf.getReceiverQueueSize(), 
executorProvider, subscribeFuture, schema,
                 interceptors);
         this.consumerId = client.newConsumerId();
@@ -319,21 +322,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         this.connectionHandler = new ConnectionHandler(this,
-                        new BackoffBuilder()
-                                
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
-                                        TimeUnit.NANOSECONDS)
-                                
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                                .create(),
+                new BackoffBuilder()
+                        
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+                                TimeUnit.NANOSECONDS)
+                        
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
+                        .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                        .create(),
                 this);
 
         this.topicName = TopicName.get(topic);
         if (this.topicName.isPersistent()) {
             this.acknowledgmentsGroupingTracker =
-                new PersistentAcknowledgmentsGroupingTracker(this, conf, 
client.eventLoopGroup());
+                    new PersistentAcknowledgmentsGroupingTracker(this, conf, 
client.eventLoopGroup());
         } else {
             this.acknowledgmentsGroupingTracker =
-                NonPersistentAcknowledgmentGroupingTracker.of();
+                    NonPersistentAcknowledgmentGroupingTracker.of();
         }
 
         if (conf.getDeadLetterPolicy() != null) {
@@ -410,16 +413,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 log.error("[{}][{}] Failed to unsubscribe: {}", topic, 
subscription, e.getCause().getMessage());
                 setState(State.Ready);
                 unsubscribeFuture.completeExceptionally(
-                    PulsarClientException.wrap(e.getCause(),
-                        String.format("Failed to unsubscribe the subscription 
%s of topic %s",
-                            topicName.toString(), subscription)));
+                        PulsarClientException.wrap(e.getCause(),
+                                String.format("Failed to unsubscribe the 
subscription %s of topic %s",
+                                        topicName.toString(), subscription)));
                 return null;
             });
         } else {
             unsubscribeFuture.completeExceptionally(
-                new PulsarClientException(
-                    String.format("The client is not connected to the broker 
when unsubscribing the "
-                            + "subscription %s of the topic %s", subscription, 
topicName.toString())));
+                    new PulsarClientException(
+                            String.format("The client is not connected to the 
broker when unsubscribing the "
+                                    + "subscription %s of the topic %s", 
subscription, topicName.toString())));
         }
         return unsubscribeFuture;
     }
@@ -1400,7 +1403,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     private ByteBuf processMessageChunk(ByteBuf compressedPayload, 
MessageMetadata msgMetadata, MessageIdImpl msgId,
-            MessageIdData messageId, ClientCnx cnx) {
+                                        MessageIdData messageId, ClientCnx 
cnx) {
 
         // Lazy task scheduling to expire incomplete chunk message
         if (expireTimeOfIncompleteChunkedMessageMillis > 0 && 
expireChunkMessageTaskScheduled.compareAndSet(false,
@@ -1445,7 +1448,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             increaseAvailablePermits(cnx);
             if (expireTimeOfIncompleteChunkedMessageMillis > 0
                     && System.currentTimeMillis() > 
(msgMetadata.getPublishTime()
-                            + expireTimeOfIncompleteChunkedMessageMillis)) {
+                    + expireTimeOfIncompleteChunkedMessageMillis)) {
                 doAcknowledge(msgId, AckType.Individual, 
Collections.emptyMap(), null);
             } else {
                 trackMessage(msgId);
@@ -1636,7 +1639,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     protected void trackMessage(MessageId messageId) {
-            trackMessage(messageId, 0);
+        trackMessage(messageId, 0);
     }
 
     protected void trackMessage(MessageId messageId, int redeliveryCount) {
@@ -1777,7 +1780,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, 
MessageMetadata msgMetadata, ByteBuf payload,
-            ClientCnx currentCnx, boolean checkMaxMessageSize) {
+                                              ClientCnx currentCnx, boolean 
checkMaxMessageSize) {
         CompressionType compressionType = msgMetadata.getCompression();
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
         int uncompressedSize = msgMetadata.getUncompressedSize();
@@ -1830,7 +1833,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     private void discardCorruptedMessage(MessageIdData messageId, ClientCnx 
currentCnx,
-            ValidationError validationError) {
+                                         ValidationError validationError) {
         log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, 
subscription, messageId.getLedgerId(),
                 messageId.getEntryId());
         discardMessage(messageId, currentCnx, validationError);
@@ -2022,8 +2025,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     String originTopicNameStr = getOriginTopicNameStr(message);
                     TypedMessageBuilder<byte[]> typedMessageBuilderNew =
                             
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
-                            .value(message.getData())
-                            .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
+                                    .value(message.getData())
+                                    .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
                     if (message.hasKey()) {
                         typedMessageBuilderNew.key(message.getKey());
                     }
@@ -2052,7 +2055,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 }
                                 result.complete(false);
                                 return null;
-                    });
+                            });
                 }
             }, internalPinnedExecutor).exceptionally(ex -> {
                 log.error("Dead letter producer exception with topic: {}", 
deadLetterPolicy.getDeadLetterTopic(), ex);
@@ -2151,9 +2154,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
 
+        if (!duringSeek.compareAndSet(false, true)) {
+            log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}",
+                    topic, subscription, seekBy);
+            seekFuture.cancel(true);
+            return seekFuture;
+        }
+
         MessageIdAdv originSeekMessageId = seekMessageId;
         seekMessageId = (MessageIdAdv) seekId;
-        duringSeek.set(true);
         log.info("[{}][{}] Seeking subscription to {}", topic, subscription, 
seekBy);
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
@@ -2171,9 +2180,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
 
             seekFuture.completeExceptionally(
-                PulsarClientException.wrap(e.getCause(),
-                    String.format("Failed to seek the subscription %s of the 
topic %s to %s",
-                        subscription, topicName.toString(), seekBy)));
+                    PulsarClientException.wrap(e.getCause(),
+                            String.format("Failed to seek the subscription %s 
of the topic %s to %s",
+                                    subscription, topicName.toString(), 
seekBy)));
             return null;
         });
         return seekFuture;
@@ -2185,7 +2194,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return seekAsyncCheckState(seekBy).orElseGet(() -> {
             long requestId = client.newRequestId();
             return seekAsyncInternal(requestId, Commands.newSeek(consumerId, 
requestId, timestamp),
-                MessageId.earliest, seekBy);
+                    MessageId.earliest, seekBy);
         });
     }
 
@@ -2351,10 +2360,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     public CompletableFuture<GetLastMessageIdResponse> 
internalGetLastMessageIdAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil
-                .failedFuture(new PulsarClientException.AlreadyClosedException(
-                    String.format("The consumer %s was already closed when the 
subscription %s of the topic %s "
-                            + "getting the last message id", consumerName, 
subscription, topicName.toString())));
-                }
+                    .failedFuture(new 
PulsarClientException.AlreadyClosedException(
+                            String.format("The consumer %s was already closed 
when the subscription %s of the topic %s "
+                                            + "getting the last message id", 
consumerName, subscription,
+                                    topicName.toString())));
+        }
 
         AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
         Backoff backoff = new BackoffBuilder()
@@ -2376,11 +2386,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (isConnected() && cnx != null) {
             if 
(!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion()))
 {
                 future.completeExceptionally(
-                    new PulsarClientException.NotSupportedException(
-                        String.format("The command `GetLastMessageId` is not 
supported for the protocol version %d. "
-                                        + "The consumer is %s, topic %s, 
subscription %s",
-                                cnx.getRemoteEndpointProtocolVersion(),
-                                consumerName, topicName.toString(), 
subscription)));
+                        new PulsarClientException.NotSupportedException(
+                                String.format(
+                                        "The command `GetLastMessageId` is not 
supported for the protocol version %d. "
+                                                + "The consumer is %s, topic 
%s, subscription %s",
+                                        cnx.getRemoteEndpointProtocolVersion(),
+                                        consumerName, topicName.toString(), 
subscription)));
                 return;
             }
 
@@ -2399,31 +2410,31 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{}] Successfully getLastMessageId {}:{}",
-                        topic, subscription, lastMessageId.getLedgerId(), 
lastMessageId.getEntryId());
+                            topic, subscription, lastMessageId.getLedgerId(), 
lastMessageId.getEntryId());
                 }
 
                 MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0
                         ? new MessageIdImpl(lastMessageId.getLedgerId(),
-                                lastMessageId.getEntryId(), 
lastMessageId.getPartition())
+                        lastMessageId.getEntryId(), 
lastMessageId.getPartition())
                         : new BatchMessageIdImpl(lastMessageId.getLedgerId(), 
lastMessageId.getEntryId(),
-                                lastMessageId.getPartition(), 
lastMessageId.getBatchIndex());
+                        lastMessageId.getPartition(), 
lastMessageId.getBatchIndex());
 
                 future.complete(new GetLastMessageIdResponse(lastMsgId, 
markDeletePosition));
             }).exceptionally(e -> {
                 log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
                 future.completeExceptionally(
-                    PulsarClientException.wrap(e.getCause(),
-                        String.format("The subscription %s of the topic %s 
gets the last message id was failed",
-                            subscription, topicName.toString())));
+                        PulsarClientException.wrap(e.getCause(),
+                                String.format("The subscription %s of the 
topic %s gets the last message id was failed",
+                                        subscription, topicName.toString())));
                 return null;
             });
         } else {
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             if (nextDelay <= 0) {
                 future.completeExceptionally(
-                    new PulsarClientException.TimeoutException(
-                        String.format("The subscription %s of the topic %s 
could not get the last message id "
-                                + "withing configured timeout", subscription, 
topicName.toString())));
+                        new PulsarClientException.TimeoutException(
+                                String.format("The subscription %s of the 
topic %s could not get the last message id "
+                                        + "withing configured timeout", 
subscription, topicName.toString())));
                 return;
             }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 29d180f5f9a..ab11675f543 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -26,7 +27,9 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
@@ -259,4 +262,26 @@ public class ConsumerImplTest {
 
         assertThat(consumer.getPriorityLevel()).isEqualTo(1);
     }
+
+    @Test(invocationTimeOut = 1000)
+    public void testSeekAsyncInternal() {
+        // given
+        ClientCnx cnx = mock(ClientCnx.class);
+        CompletableFuture<ProducerResponse> clientReq = new 
CompletableFuture<>();
+        when(cnx.sendRequestWithId(any(ByteBuf.class), 
anyLong())).thenReturn(clientReq);
+
+        consumer.setClientCnx(cnx);
+        consumer.setState(HandlerState.State.Ready);
+
+        // when
+        CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
+        CompletableFuture<Void> secondResult = consumer.seekAsync(1L);
+
+        clientReq.complete(null);
+
+        // then
+        assertTrue(firstResult.isDone());
+        assertTrue(secondResult.isCancelled());
+        verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());
+    }
 }

Reply via email to