This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 49ff0f8b55f [fix][test] Enable testMaxPendingChunkMessages (#18542)
49ff0f8b55f is described below
commit 49ff0f8b55f3ef8a477ad7e6a63dceeb8bafee90
Author: Zike Yang <[email protected]>
AuthorDate: Tue Nov 22 16:06:28 2022 +0800
[fix][test] Enable testMaxPendingChunkMessages (#18542)
---
.../org/apache/pulsar/client/impl/MessageChunkingTest.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 92771c11581..f26bb6b7501 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -309,17 +309,18 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
producer.close();
}
- @Test(enabled = false)
+ @Test
public void testMaxPendingChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
- this.conf.setMaxMessageSize(10);
+ this.conf.setMaxMessageSize(100);
final int totalMessages = 25;
final String topicName = "persistent://my-property/my-ns/maxPending";
final int totalProducers = 25;
@Cleanup("shutdownNow")
ExecutorService executor =
Executors.newFixedThreadPool(totalProducers);
+ @Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0,
TimeUnit.SECONDS)
.maxPendingChunkedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true)
@@ -334,12 +335,11 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
producers[i] =
producerBuilder.enableChunking(true).enableBatching(false).create();
int index = i;
executor.submit(() -> {
-
futures.add(producers[index].sendAsync(createMessagePayload(45).getBytes()));
+
futures.add(producers[index].sendAsync(createMessagePayload(450).getBytes()));
});
}
FutureUtil.waitForAll(futures).get();
- PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
Message<byte[]> msg = null;
Set<String> messageSet = new HashSet<>();
@@ -354,8 +354,12 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
consumer.acknowledge(msg);
}
+ log.info("messageSet size: {}, totalPublishedMessages: {}",
messageSet.size(), totalPublishedMessages);
assertNotEquals(messageSet.size(), totalPublishedMessages);
+ for (int i = 0; i < totalProducers; i++) {
+ producers[i].close();
+ }
}
/**