This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ca6aa1cef [fix][client] Fix RawReader hasMessageAvailable returns 
true when no messages (#21032)
00ca6aa1cef is described below

commit 00ca6aa1cef950bd9ea5014d4332d16d3c8d37e2
Author: Jiwei Guo <[email protected]>
AuthorDate: Sun Aug 20 21:45:29 2023 +0800

    [fix][client] Fix RawReader hasMessageAvailable returns true when no 
messages (#21032)
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  7 ++--
 .../pulsar/client/impl/MultiTopicsReaderTest.java  |  2 +-
 .../pulsar/client/impl/NegativeAcksTest.java       |  2 +-
 .../apache/pulsar/client/impl/RawReaderTest.java   | 40 +++++++++++++---------
 4 files changed, 29 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 9a1c972b2cc..70bda888bf7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -151,14 +151,13 @@ public class RawReaderImpl implements RawReader {
                     // TODO message validation
                     numMsg = 1;
                 }
+                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+                lastDequeuedMessageId = new 
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                        messageId.getPartition(), numMsg - 1);
                 if (!future.complete(messageAndCnx.msg)) {
                     messageAndCnx.msg.close();
                     closeAsync();
                 }
-                MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
-                lastDequeuedMessageId = new 
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
-                    messageId.getPartition(), numMsg - 1);
-
                 ClientCnx currentCnx = cnx();
                 if (currentCnx == messageAndCnx.cnx) {
                     increaseAvailablePermits(currentCnx, numMsg);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a41aac9bd45..d9bbc6a9d74 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -211,7 +211,7 @@ public class MultiTopicsReaderTest extends 
MockedPulsarServiceBaseTest {
         reader.close();
     }
 
-    @Test(timeOut = 10000)
+    @Test
     public void testReaderWithTimeLong() throws Exception {
         String ns = "my-property/my-ns";
         String topic = "persistent://" + ns + "/testReadFromPartition" + 
UUID.randomUUID();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 876fa98bce4..a6b77a1c727 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -321,7 +321,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         negativeAcksTracker.close();
     }
 
-    @Test(timeOut = 10000)
+    @Test
     public void testNegativeAcksWithBatchAckEnabled() throws Exception {
         cleanup();
         conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index a201ef104e7..95d8926c9ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -30,8 +30,10 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
@@ -51,6 +53,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
+@Slf4j
 public class RawReaderTest extends MockedPulsarServiceBaseTest {
 
     private static final String subscription = "foobar-sub";
@@ -62,6 +65,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
                 
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
                 
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
         ));
+        conf.setSystemTopicEnabled(false);
         conf.setExposingBrokerEntryMetadataToClientEnabled(true);
         super.internalSetup();
 
@@ -116,7 +120,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testHasMessageAvailableWithoutBatch() throws Exception {
         int numKeys = 10;
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
         Set<String> keys = publishMessages(topic, numKeys);
         RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
         while (true) {
@@ -133,20 +137,18 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
             }
         }
         Assert.assertTrue(keys.isEmpty());
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     @Test
     public void testHasMessageAvailableWithBatch() throws Exception {
         int numKeys = 20;
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
         Set<String> keys = publishMessages(topic, numKeys, true);
         RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();
         int messageCount = 0;
         while (true) {
             boolean hasMsg = reader.hasMessageAvailableAsync().get();
-            if (hasMsg && (messageCount == numKeys)) {
-                Assert.fail("HasMessageAvailable shows still has message when 
there is no message");
-            }
             if (hasMsg) {
                 try (RawMessage m = reader.readNextAsync().get()) {
                     MessageMetadata meta = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
@@ -163,13 +165,14 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         }
         Assert.assertEquals(messageCount, numKeys);
         Assert.assertTrue(keys.isEmpty());
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     @Test
     public void testRawReader() throws Exception {
         int numKeys = 10;
 
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         Set<String> keys = publishMessages(topic, numKeys);
 
@@ -185,12 +188,13 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
             }
         }
         Assert.assertTrue(keys.isEmpty());
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     @Test
     public void testSeekToStart() throws Exception {
         int numKeys = 10;
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         publishMessages(topic, numKeys);
 
@@ -219,12 +223,13 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
             }
         }
         Assert.assertTrue(readKeys.isEmpty());
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     @Test
     public void testSeekToMiddle() throws Exception {
         int numKeys = 10;
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         publishMessages(topic, numKeys);
 
@@ -262,6 +267,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
             }
         }
         Assert.assertTrue(readKeys.isEmpty());
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     /**
@@ -270,7 +276,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testFlowControl() throws Exception {
         int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         publishMessages(topic, numMessages);
 
@@ -296,12 +302,13 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         }
         Assert.assertEquals(timeouts, 1);
         Assert.assertEquals(keys.size(), numMessages);
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     @Test
     public void testFlowControlBatch() throws Exception {
         int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         publishMessages(topic, numMessages, true);
 
@@ -324,11 +331,12 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
             }
         }
         Assert.assertEquals(keys.size(), numMessages);
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     @Test
     public void testBatchingExtractKeysAndIds() throws Exception {
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic)
             .maxPendingMessages(3)
@@ -363,7 +371,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testBatchingRebatch() throws Exception {
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic)
             .maxPendingMessages(3)
@@ -392,7 +400,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testBatchingRebatchWithBrokerEntryMetadata() throws Exception {
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic)
                 .maxPendingMessages(3)
@@ -428,7 +436,7 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
     public void testAcknowledgeWithProperties() throws Exception {
         int numKeys = 10;
 
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
 
         Set<String> keys = publishMessages(topic, numKeys);
 
@@ -459,14 +467,14 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
                         Assert.assertEquals(
                                 
ledger.openCursor(subscription).getProperties().get("foobar"),
                                 Long.valueOf(0xdeadbeefdecaL)));
-
+        reader.closeAsync().get(3, TimeUnit.SECONDS);
     }
 
     @Test
     public void testReadCancellationOnClose() throws Exception {
         int numKeys = 10;
 
-        String topic = "persistent://my-property/my-ns/my-raw-topic";
+        String topic = "persistent://my-property/my-ns/" + 
BrokerTestUtil.newUniqueName("reader");
         publishMessages(topic, numKeys/2);
 
         RawReader reader = RawReader.create(pulsarClient, topic, 
subscription).get();

Reply via email to