This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d568606ccdb [fix][client] Fix RawReader hasMessageAvailable returns
true when no messages (#21032)
d568606ccdb is described below
commit d568606ccdb6dba398d6d0dc4102742e77a1acd0
Author: Jiwei Guo <[email protected]>
AuthorDate: Sun Aug 20 21:45:29 2023 +0800
[fix][client] Fix RawReader hasMessageAvailable returns true when no
messages (#21032)
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 7 ++--
.../pulsar/client/impl/MultiTopicsReaderTest.java | 2 +-
.../pulsar/client/impl/NegativeAcksTest.java | 2 +-
.../apache/pulsar/client/impl/RawReaderTest.java | 43 ++++++++++++++--------
4 files changed, 33 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 9a1c972b2cc..70bda888bf7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -151,14 +151,13 @@ public class RawReaderImpl implements RawReader {
// TODO message validation
numMsg = 1;
}
+ MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+ lastDequeuedMessageId = new
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+ messageId.getPartition(), numMsg - 1);
if (!future.complete(messageAndCnx.msg)) {
messageAndCnx.msg.close();
closeAsync();
}
- MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
- lastDequeuedMessageId = new
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
- messageId.getPartition(), numMsg - 1);
-
ClientCnx currentCnx = cnx();
if (currentCnx == messageAndCnx.cnx) {
increaseAvailablePermits(currentCnx, numMsg);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a41aac9bd45..d9bbc6a9d74 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -211,7 +211,7 @@ public class MultiTopicsReaderTest extends
MockedPulsarServiceBaseTest {
reader.close();
}
- @Test(timeOut = 10000)
+ @Test
public void testReaderWithTimeLong() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testReadFromPartition" +
UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index b4d01e263bc..7812844bdc2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -319,7 +319,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
negativeAcksTracker.close();
}
- @Test(timeOut = 10000)
+ @Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index fb09cd99518..de011ea490c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -30,8 +30,10 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
@@ -50,6 +52,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
+@Slf4j
public class RawReaderTest extends MockedPulsarServiceBaseTest {
private static final String subscription = "foobar-sub";
@@ -57,6 +60,12 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@BeforeMethod
@Override
public void setup() throws Exception {
+
conf.setBrokerEntryMetadataInterceptors(org.assertj.core.util.Sets.newTreeSet(
+
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
+
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
+ ));
+ conf.setSystemTopicEnabled(false);
+ conf.setExposingBrokerEntryMetadataToClientEnabled(true);
super.internalSetup();
admin.clusters().createCluster("test",
@@ -110,7 +119,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testHasMessageAvailableWithoutBatch() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys);
RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();
while (true) {
@@ -127,20 +136,18 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(keys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testHasMessageAvailableWithBatch() throws Exception {
int numKeys = 20;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys, true);
RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();
int messageCount = 0;
while (true) {
boolean hasMsg = reader.hasMessageAvailableAsync().get();
- if (hasMsg && (messageCount == numKeys)) {
- Assert.fail("HasMessageAvailable shows still has message when
there is no message");
- }
if (hasMsg) {
try (RawMessage m = reader.readNextAsync().get()) {
MessageMetadata meta =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
@@ -157,13 +164,14 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
Assert.assertEquals(messageCount, numKeys);
Assert.assertTrue(keys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testRawReader() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys);
@@ -179,12 +187,13 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(keys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testSeekToStart() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numKeys);
@@ -213,12 +222,13 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(readKeys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testSeekToMiddle() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numKeys);
@@ -256,6 +266,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(readKeys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
/**
@@ -264,7 +275,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testFlowControl() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numMessages);
@@ -290,12 +301,13 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
Assert.assertEquals(timeouts, 1);
Assert.assertEquals(keys.size(), numMessages);
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testFlowControlBatch() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numMessages, true);
@@ -318,11 +330,12 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertEquals(keys.size(), numMessages);
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testBatchingExtractKeysAndIds() throws Exception {
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
try (Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic)
.maxPendingMessages(3)
@@ -357,7 +370,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testBatchingRebatch() throws Exception {
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
try (Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic)
.maxPendingMessages(3)
@@ -388,7 +401,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
public void testAcknowledgeWithProperties() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys);
@@ -419,14 +432,14 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(
ledger.openCursor(subscription).getProperties().get("foobar"),
Long.valueOf(0xdeadbeefdecaL)));
-
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testReadCancellationOnClose() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numKeys/2);
RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();