This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d82b09128f [fix] [client] fix reader.hasMessageAvailable return false 
when incoming queue is not empty (#21259)
6d82b09128f is described below

commit 6d82b09128f46fdcb27021560d773fac15d66a48
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 28 23:45:39 2023 +0800

    [fix] [client] fix reader.hasMessageAvailable return false when incoming 
queue is not empty (#21259)
    
    ### Motivation
    
    Reproduce steps:
    - Create a reader.
    - Reader pulls messages into `incoming queue`, do not call 
`reader.readNext` now.
    - Trim ledger task will delete the ledgers, then there is no in the topic.
    - Now, you can get messages if you call `reader.readNext`, but the method 
`reader.hasMessageAvailable` return `false`
    
    Note: the similar issue of `MultiTopicsConsumerImpl` has been fixed by 
https://github.com/apache/pulsar/pull/13332, current PR only trying to fix the 
issue of `ConsumerImpl`.
    
    ### Modifications
    
    Make `reader.hasMessageAvailable` return `true` when `incoming queue` is 
not empty.
---
 .../client/api/NonDurableSubscriptionTest.java     | 87 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  4 +
 2 files changed, 91 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 6375f79bfbb..b9e3e6dcebe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -19,17 +19,29 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.PulsarChannelInitializer;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandFlow;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -254,4 +266,79 @@ public class NonDurableSubscriptionTest  extends 
ProducerConsumerBase {
 
         assertEquals(numFlow.get(), numPartitions);
     }
+
+    private void trimLedgers(final String tpName) {
+        // Wait for topic loading.
+        org.awaitility.Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+            assertNotNull(persistentTopic);
+        });
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, 
false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        CompletableFuture<Void> trimLedgersTask = new CompletableFuture<>();
+        ml.trimConsumedLedgersInBackground(trimLedgersTask);
+        trimLedgersTask.join();
+    }
+
+    private void switchLedgerManually(final String tpName) throws Exception {
+        Method ledgerClosed =
+                ManagedLedgerImpl.class.getDeclaredMethod("ledgerClosed", new 
Class[]{LedgerHandle.class});
+        Method createLedgerAfterClosed =
+                
ManagedLedgerImpl.class.getDeclaredMethod("createLedgerAfterClosed", new 
Class[0]);
+        ledgerClosed.setAccessible(true);
+        createLedgerAfterClosed.setAccessible(true);
+
+        // Wait for topic create.
+        org.awaitility.Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic =
+                    (PersistentTopic) 
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+            assertNotNull(persistentTopic);
+        });
+
+        // Switch ledger.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, 
false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        LedgerHandle currentLedger1 = WhiteboxImpl.getInternalState(ml, 
"currentLedger");
+        ledgerClosed.invoke(ml, new Object[]{currentLedger1});
+        createLedgerAfterClosed.invoke(ml, new Object[0]);
+        Awaitility.await().untilAsserted(() -> {
+            LedgerHandle currentLedger2 = WhiteboxImpl.getInternalState(ml, 
"currentLedger");
+            assertNotEquals(currentLedger1.getId(), currentLedger2.getId());
+        });
+    }
+
+    @Test
+    public void testTrimLedgerIfNoDurableCursor() throws Exception {
+        final String nonDurableCursor = "non-durable-cursor";
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        Reader<String> reader = 
pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
+                
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        MessageIdImpl msgSent = (MessageIdImpl) producer.send("1");
+
+        // Trigger switch ledger.
+        // Trigger a trim ledgers task, and verify trim ledgers successful.
+        switchLedgerManually(topicName);
+        trimLedgers(topicName);
+
+        // Since there is one message in the incoming queue, so the method 
"reader.hasMessageAvailable" should return
+        // true.
+        boolean hasMessageAvailable = reader.hasMessageAvailable();
+        Message<String> msgReceived = reader.readNext(2, TimeUnit.SECONDS);
+        if (msgReceived == null) {
+            assertFalse(hasMessageAvailable);
+        } else {
+            log.info("receive msg: {}", msgReceived.getValue());
+            assertTrue(hasMessageAvailable);
+            assertEquals(msgReceived.getValue(), "1");
+        }
+
+        // cleanup.
+        reader.close();
+        producer.close();
+        admin.topics().delete(topicName);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index aa3340c6078..ded6a546c24 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2335,6 +2335,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     public CompletableFuture<Boolean> hasMessageAvailableAsync() {
         final CompletableFuture<Boolean> booleanFuture = new 
CompletableFuture<>();
 
+        if (incomingMessages != null && !incomingMessages.isEmpty()) {
+            return CompletableFuture.completedFuture(true);
+        }
+
         // we haven't read yet. use startMessageId for comparison
         if (lastDequeuedMessageId == MessageId.earliest) {
             // if we are starting from latest, we should seek to the actual 
last message first.

Reply via email to