This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6d82b09128f [fix] [client] fix reader.hasMessageAvailable return false
when incoming queue is not empty (#21259)
6d82b09128f is described below
commit 6d82b09128f46fdcb27021560d773fac15d66a48
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 28 23:45:39 2023 +0800
[fix] [client] fix reader.hasMessageAvailable return false when incoming
queue is not empty (#21259)
### Motivation
Reproduce steps:
- Create a reader.
- Reader pulls messages into `incoming queue`, do not call
`reader.readNext` now.
- Trim ledger task will delete the ledgers, then there is no in the topic.
- Now, you can get messages if you call `reader.readNext`, but the method
`reader.hasMessageAvailable` return `false`
Note: the similar issue of `MultiTopicsConsumerImpl` has been fixed by
https://github.com/apache/pulsar/pull/13332, current PR only trying to fix the
issue of `ConsumerImpl`.
### Modifications
Make `reader.hasMessageAvailable` return `true` when `incoming queue` is
not empty.
---
.../client/api/NonDurableSubscriptionTest.java | 87 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 4 +
2 files changed, 91 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index 6375f79bfbb..b9e3e6dcebe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -19,17 +19,29 @@
package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -254,4 +266,79 @@ public class NonDurableSubscriptionTest extends
ProducerConsumerBase {
assertEquals(numFlow.get(), numPartitions);
}
+
+ private void trimLedgers(final String tpName) {
+ // Wait for topic loading.
+ org.awaitility.Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+ assertNotNull(persistentTopic);
+ });
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(tpName,
false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ CompletableFuture<Void> trimLedgersTask = new CompletableFuture<>();
+ ml.trimConsumedLedgersInBackground(trimLedgersTask);
+ trimLedgersTask.join();
+ }
+
+ private void switchLedgerManually(final String tpName) throws Exception {
+ Method ledgerClosed =
+ ManagedLedgerImpl.class.getDeclaredMethod("ledgerClosed", new
Class[]{LedgerHandle.class});
+ Method createLedgerAfterClosed =
+
ManagedLedgerImpl.class.getDeclaredMethod("createLedgerAfterClosed", new
Class[0]);
+ ledgerClosed.setAccessible(true);
+ createLedgerAfterClosed.setAccessible(true);
+
+ // Wait for topic create.
+ org.awaitility.Awaitility.await().untilAsserted(() -> {
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar.getBrokerService().getTopic(tpName, false).join().get();
+ assertNotNull(persistentTopic);
+ });
+
+ // Switch ledger.
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(tpName,
false).join().get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ LedgerHandle currentLedger1 = WhiteboxImpl.getInternalState(ml,
"currentLedger");
+ ledgerClosed.invoke(ml, new Object[]{currentLedger1});
+ createLedgerAfterClosed.invoke(ml, new Object[0]);
+ Awaitility.await().untilAsserted(() -> {
+ LedgerHandle currentLedger2 = WhiteboxImpl.getInternalState(ml,
"currentLedger");
+ assertNotEquals(currentLedger1.getId(), currentLedger2.getId());
+ });
+ }
+
+ @Test
+ public void testTrimLedgerIfNoDurableCursor() throws Exception {
+ final String nonDurableCursor = "non-durable-cursor";
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ Reader<String> reader =
pulsarClient.newReader(Schema.STRING).topic(topicName).receiverQueueSize(1)
+
.subscriptionName(nonDurableCursor).startMessageId(MessageIdImpl.earliest).create();
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ MessageIdImpl msgSent = (MessageIdImpl) producer.send("1");
+
+ // Trigger switch ledger.
+ // Trigger a trim ledgers task, and verify trim ledgers successful.
+ switchLedgerManually(topicName);
+ trimLedgers(topicName);
+
+ // Since there is one message in the incoming queue, so the method
"reader.hasMessageAvailable" should return
+ // true.
+ boolean hasMessageAvailable = reader.hasMessageAvailable();
+ Message<String> msgReceived = reader.readNext(2, TimeUnit.SECONDS);
+ if (msgReceived == null) {
+ assertFalse(hasMessageAvailable);
+ } else {
+ log.info("receive msg: {}", msgReceived.getValue());
+ assertTrue(hasMessageAvailable);
+ assertEquals(msgReceived.getValue(), "1");
+ }
+
+ // cleanup.
+ reader.close();
+ producer.close();
+ admin.topics().delete(topicName);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index aa3340c6078..ded6a546c24 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2335,6 +2335,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
final CompletableFuture<Boolean> booleanFuture = new
CompletableFuture<>();
+ if (incomingMessages != null && !incomingMessages.isEmpty()) {
+ return CompletableFuture.completedFuture(true);
+ }
+
// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
// if we are starting from latest, we should seek to the actual
last message first.