This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 00ca6aa1cef [fix][client] Fix RawReader hasMessageAvailable returns
true when no messages (#21032)
00ca6aa1cef is described below
commit 00ca6aa1cef950bd9ea5014d4332d16d3c8d37e2
Author: Jiwei Guo <[email protected]>
AuthorDate: Sun Aug 20 21:45:29 2023 +0800
[fix][client] Fix RawReader hasMessageAvailable returns true when no
messages (#21032)
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 7 ++--
.../pulsar/client/impl/MultiTopicsReaderTest.java | 2 +-
.../pulsar/client/impl/NegativeAcksTest.java | 2 +-
.../apache/pulsar/client/impl/RawReaderTest.java | 40 +++++++++++++---------
4 files changed, 29 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 9a1c972b2cc..70bda888bf7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -151,14 +151,13 @@ public class RawReaderImpl implements RawReader {
// TODO message validation
numMsg = 1;
}
+ MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
+ lastDequeuedMessageId = new
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+ messageId.getPartition(), numMsg - 1);
if (!future.complete(messageAndCnx.msg)) {
messageAndCnx.msg.close();
closeAsync();
}
- MessageIdData messageId = messageAndCnx.msg.getMessageIdData();
- lastDequeuedMessageId = new
BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
- messageId.getPartition(), numMsg - 1);
-
ClientCnx currentCnx = cnx();
if (currentCnx == messageAndCnx.cnx) {
increaseAvailablePermits(currentCnx, numMsg);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a41aac9bd45..d9bbc6a9d74 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -211,7 +211,7 @@ public class MultiTopicsReaderTest extends
MockedPulsarServiceBaseTest {
reader.close();
}
- @Test(timeOut = 10000)
+ @Test
public void testReaderWithTimeLong() throws Exception {
String ns = "my-property/my-ns";
String topic = "persistent://" + ns + "/testReadFromPartition" +
UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 876fa98bce4..a6b77a1c727 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -321,7 +321,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
negativeAcksTracker.close();
}
- @Test(timeOut = 10000)
+ @Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index a201ef104e7..95d8926c9ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -30,8 +30,10 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
@@ -51,6 +53,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
+@Slf4j
public class RawReaderTest extends MockedPulsarServiceBaseTest {
private static final String subscription = "foobar-sub";
@@ -62,6 +65,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
));
+ conf.setSystemTopicEnabled(false);
conf.setExposingBrokerEntryMetadataToClientEnabled(true);
super.internalSetup();
@@ -116,7 +120,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testHasMessageAvailableWithoutBatch() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys);
RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();
while (true) {
@@ -133,20 +137,18 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(keys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testHasMessageAvailableWithBatch() throws Exception {
int numKeys = 20;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys, true);
RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();
int messageCount = 0;
while (true) {
boolean hasMsg = reader.hasMessageAvailableAsync().get();
- if (hasMsg && (messageCount == numKeys)) {
- Assert.fail("HasMessageAvailable shows still has message when
there is no message");
- }
if (hasMsg) {
try (RawMessage m = reader.readNextAsync().get()) {
MessageMetadata meta =
Commands.parseMessageMetadata(m.getHeadersAndPayload());
@@ -163,13 +165,14 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
Assert.assertEquals(messageCount, numKeys);
Assert.assertTrue(keys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testRawReader() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys);
@@ -185,12 +188,13 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(keys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testSeekToStart() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numKeys);
@@ -219,12 +223,13 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(readKeys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testSeekToMiddle() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numKeys);
@@ -262,6 +267,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertTrue(readKeys.isEmpty());
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
/**
@@ -270,7 +276,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testFlowControl() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numMessages);
@@ -296,12 +302,13 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
Assert.assertEquals(timeouts, 1);
Assert.assertEquals(keys.size(), numMessages);
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testFlowControlBatch() throws Exception {
int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numMessages, true);
@@ -324,11 +331,12 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
}
}
Assert.assertEquals(keys.size(), numMessages);
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testBatchingExtractKeysAndIds() throws Exception {
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
try (Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic)
.maxPendingMessages(3)
@@ -363,7 +371,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testBatchingRebatch() throws Exception {
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
try (Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic)
.maxPendingMessages(3)
@@ -392,7 +400,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
@Test
public void testBatchingRebatchWithBrokerEntryMetadata() throws Exception {
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
try (Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic)
.maxPendingMessages(3)
@@ -428,7 +436,7 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
public void testAcknowledgeWithProperties() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
Set<String> keys = publishMessages(topic, numKeys);
@@ -459,14 +467,14 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(
ledger.openCursor(subscription).getProperties().get("foobar"),
Long.valueOf(0xdeadbeefdecaL)));
-
+ reader.closeAsync().get(3, TimeUnit.SECONDS);
}
@Test
public void testReadCancellationOnClose() throws Exception {
int numKeys = 10;
- String topic = "persistent://my-property/my-ns/my-raw-topic";
+ String topic = "persistent://my-property/my-ns/" +
BrokerTestUtil.newUniqueName("reader");
publishMessages(topic, numKeys/2);
RawReader reader = RawReader.create(pulsarClient, topic,
subscription).get();