This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 318ff3398d39fa16010a664c747adbe77fd44d20
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Mar 7 11:32:18 2024 +0800

    [fix][client] fix Reader.hasMessageAvailable might return true after 
seeking to latest (#22201)
    
    (cherry picked from commit 95a53f3a033c4e57db2ddb2d1f0e9a4bc8b9f441)
---
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  27 ++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 104 ++++++++++++++-------
 .../pulsar/client/impl/ConsumerImplTest.java       |   2 +-
 3 files changed, 96 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 2f91d792581..d511c6dc37f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -66,6 +66,7 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -761,4 +762,30 @@ public class ReaderTest extends 
MockedPulsarServiceBaseTest {
         producer.close();
         admin.topics().delete(topic, false);
     }
+
+    @DataProvider
+    public static Object[][] initializeLastMessageIdInBroker() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "initializeLastMessageIdInBroker")
+    public void testHasMessageAvailableAfterSeek(boolean 
initializeLastMessageIdInBroker) throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/test-has-message-available-after-seek";
+        @Cleanup Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
+                .startMessageId(MessageId.earliest).create();
+
+        @Cleanup Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        producer.send("msg");
+
+        if (initializeLastMessageIdInBroker) {
+            assertTrue(reader.hasMessageAvailable());
+        } // else: lastMessageIdInBroker is earliest
+
+        reader.seek(MessageId.latest);
+        // lastMessageIdInBroker is the last message ID, while startMessageId 
is still earliest
+        assertFalse(reader.hasMessageAvailable());
+
+        producer.send("msg");
+        assertTrue(reader.hasMessageAvailable());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 717355abbef..dae53c59d6e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -166,7 +166,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private volatile MessageIdAdv startMessageId;
 
     private volatile MessageIdAdv seekMessageId;
-    private final AtomicBoolean duringSeek;
+    @VisibleForTesting
+    final AtomicReference<SeekStatus> seekStatus;
+    private volatile CompletableFuture<Void> seekFuture;
 
     private final MessageIdAdv initialStartMessageId;
 
@@ -296,7 +298,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             stats = ConsumerStatsDisabled.INSTANCE;
         }
 
-        duringSeek = new AtomicBoolean(false);
+        seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);
 
         // Create msgCrypto if not created already
         if (conf.getCryptoKeyReader() != null) {
@@ -771,7 +773,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             closeConsumerTasks();
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
-            clearReceiverQueue();
+            clearReceiverQueue(false);
             return CompletableFuture.completedFuture(null);
         }
 
@@ -779,7 +781,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 topic, subscription, cnx.ctx().channel(), consumerId);
 
         long requestId = client.newRequestId();
-        if (duringSeek.get()) {
+        if (seekStatus.get() != SeekStatus.NOT_STARTED) {
             acknowledgmentsGroupingTracker.flushAndClean();
         }
 
@@ -790,7 +792,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         int currentSize;
         synchronized (this) {
             currentSize = incomingMessages.size();
-            startMessageId = clearReceiverQueue();
+            setClientCnx(cnx);
+            clearReceiverQueue(true);
             if (possibleSendToDeadLetterTopicMessages != null) {
                 possibleSendToDeadLetterTopicMessages.clear();
             }
@@ -828,7 +831,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         // synchronized this, because redeliverUnAckMessage eliminate the 
epoch inconsistency between them
         final CompletableFuture<Void> future = new CompletableFuture<>();
         synchronized (this) {
-            setClientCnx(cnx);
             ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
                     priorityLevel, consumerName, isDurable, 
startMessageIdData, metadata, readCompacted,
                     conf.isReplicateSubscriptionState(),
@@ -933,15 +935,24 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
      * Clear the internal receiver queue and returns the message id of what 
was the 1st message in the queue that was
      * not seen by the application.
      */
-    private MessageIdAdv clearReceiverQueue() {
+    private void clearReceiverQueue(boolean updateStartMessageId) {
         List<Message<?>> currentMessageQueue = new 
ArrayList<>(incomingMessages.size());
         incomingMessages.drainTo(currentMessageQueue);
         resetIncomingMessageSize();
 
-        if (duringSeek.compareAndSet(true, false)) {
-            return seekMessageId;
+        CompletableFuture<Void> seekFuture = this.seekFuture;
+        MessageIdAdv seekMessageId = this.seekMessageId;
+
+        if (seekStatus.get() != SeekStatus.NOT_STARTED) {
+            if (updateStartMessageId) {
+                startMessageId = seekMessageId;
+            }
+            if (seekStatus.compareAndSet(SeekStatus.COMPLETED, 
SeekStatus.NOT_STARTED)) {
+                internalPinnedExecutor.execute(() -> 
seekFuture.complete(null));
+            }
+            return;
         } else if (subscriptionMode == SubscriptionMode.Durable) {
-            return startMessageId;
+            return;
         }
 
         if (!currentMessageQueue.isEmpty()) {
@@ -958,15 +969,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
             // release messages if they are pooled messages
             currentMessageQueue.forEach(Message::release);
-            return previousMessage;
-        } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
+            if (updateStartMessageId) {
+                startMessageId = previousMessage;
+            }
+        } else if (updateStartMessageId && 
!lastDequeuedMessageId.equals(MessageId.earliest)) {
             // If the queue was empty we need to restart from the message just 
after the last one that has been dequeued
             // in the past
-            return new BatchMessageIdImpl((MessageIdImpl) 
lastDequeuedMessageId);
-        } else {
-            // No message was received or dequeued by this consumer. Next 
message would still be the startMessageId
-            return startMessageId;
-        }
+            startMessageId = new BatchMessageIdImpl((MessageIdImpl) 
lastDequeuedMessageId);
+        } // else: No message was received or dequeued by this consumer. Next 
message would still be the startMessageId
     }
 
     /**
@@ -2233,25 +2243,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 .setMandatoryStop(0, TimeUnit.MILLISECONDS)
                 .create();
 
-        CompletableFuture<Void> seekFuture = new CompletableFuture<>();
-        seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
opTimeoutMs, seekFuture);
+        if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, 
SeekStatus.IN_PROGRESS)) {
+            final String message = String.format(
+                    "[%s][%s] attempting to seek operation that is already in 
progress (seek by %s)",
+                    topic, subscription, seekBy);
+            log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}",
+                    topic, subscription, seekBy);
+            return FutureUtil.failedFuture(new IllegalStateException(message));
+        }
+        seekFuture = new CompletableFuture<>();
+        seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
opTimeoutMs);
         return seekFuture;
     }
 
     private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId 
seekId, String seekBy,
-                                   final Backoff backoff, final AtomicLong 
remainingTime,
-                                   CompletableFuture<Void> seekFuture) {
+                                   final Backoff backoff, final AtomicLong 
remainingTime) {
         ClientCnx cnx = cnx();
         if (isConnected() && cnx != null) {
-            if (!duringSeek.compareAndSet(false, true)) {
-                final String message = String.format(
-                        "[%s][%s] attempting to seek operation that is already 
in progress (seek by %s)",
-                        topic, subscription, seekBy);
-                log.warn("[{}][{}] Attempting to seek operation that is 
already in progress, cancelling {}",
-                        topic, subscription, seekBy);
-                seekFuture.completeExceptionally(new 
IllegalStateException(message));
-                return;
-            }
             MessageIdAdv originSeekMessageId = seekMessageId;
             seekMessageId = (MessageIdAdv) seekId;
             log.info("[{}][{}] Seeking subscription to {}", topic, 
subscription, seekBy);
@@ -2263,14 +2271,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 lastDequeuedMessageId = MessageId.earliest;
 
                 clearIncomingMessages();
-                seekFuture.complete(null);
+                CompletableFuture<Void> future = null;
+                synchronized (this) {
+                    if (!hasParentConsumer && cnx() == null) {
+                        // It's during reconnection, complete the seek future 
after connection is established
+                        seekStatus.set(SeekStatus.COMPLETED);
+                    } else {
+                        future = seekFuture;
+                        startMessageId = seekMessageId;
+                        seekStatus.set(SeekStatus.NOT_STARTED);
+                    }
+                }
+                if (future != null) {
+                    future.complete(null);
+                }
             }).exceptionally(e -> {
-                // re-set duringSeek and seekMessageId if seek failed
                 seekMessageId = originSeekMessageId;
-                duringSeek.set(false);
                 log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
 
-                seekFuture.completeExceptionally(
+                failSeek(
                         PulsarClientException.wrap(e.getCause(),
                                 String.format("Failed to seek the subscription 
%s of the topic %s to %s",
                                         subscription, topicName.toString(), 
seekBy)));
@@ -2279,7 +2298,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         } else {
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             if (nextDelay <= 0) {
-                seekFuture.completeExceptionally(
+                failSeek(
                         new PulsarClientException.TimeoutException(
                                 String.format("The subscription %s of the 
topic %s could not seek "
                                         + "withing configured timeout", 
subscription, topicName.toString())));
@@ -2290,11 +2309,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 log.warn("[{}] [{}] Could not get connection while seek -- 
Will try again in {} ms",
                         topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
-                seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
remainingTime, seekFuture);
+                seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
remainingTime);
             }, nextDelay, TimeUnit.MILLISECONDS);
         }
     }
 
+    private void failSeek(Throwable throwable) {
+        CompletableFuture<Void> seekFuture = this.seekFuture;
+        if (seekStatus.compareAndSet(SeekStatus.IN_PROGRESS, 
SeekStatus.NOT_STARTED)) {
+            seekFuture.completeExceptionally(throwable);
+        }
+    }
+
     @Override
     public CompletableFuture<Void> seekAsync(long timestamp) {
         String seekBy = String.format("the timestamp %d", timestamp);
@@ -2952,4 +2978,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerImpl.class);
 
+    @VisibleForTesting
+    enum SeekStatus {
+        NOT_STARTED,
+        IN_PROGRESS,
+        COMPLETED
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 070919c57a4..9995246c175 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -283,6 +283,7 @@ public class ConsumerImplTest {
 
         consumer.setClientCnx(cnx);
         consumer.setState(HandlerState.State.Ready);
+        consumer.seekStatus.set(ConsumerImpl.SeekStatus.NOT_STARTED);
 
         // when
         CompletableFuture<Void> firstResult = consumer.seekAsync(1L);
@@ -290,7 +291,6 @@ public class ConsumerImplTest {
 
         clientReq.complete(null);
 
-        // then
         assertTrue(firstResult.isDone());
         assertTrue(secondResult.isCompletedExceptionally());
         verify(cnx, times(1)).sendRequestWithId(any(ByteBuf.class), anyLong());

Reply via email to