This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 49ff0f8b55f [fix][test] Enable testMaxPendingChunkMessages (#18542)
49ff0f8b55f is described below

commit 49ff0f8b55f3ef8a477ad7e6a63dceeb8bafee90
Author: Zike Yang <[email protected]>
AuthorDate: Tue Nov 22 16:06:28 2022 +0800

    [fix][test] Enable testMaxPendingChunkMessages (#18542)
---
 .../org/apache/pulsar/client/impl/MessageChunkingTest.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 92771c11581..f26bb6b7501 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -309,17 +309,18 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         producer.close();
     }
 
-    @Test(enabled = false)
+    @Test
     public void testMaxPendingChunkMessages() throws Exception {
 
         log.info("-- Starting {} test --", methodName);
-        this.conf.setMaxMessageSize(10);
+        this.conf.setMaxMessageSize(100);
         final int totalMessages = 25;
         final String topicName = "persistent://my-property/my-ns/maxPending";
         final int totalProducers = 25;
         @Cleanup("shutdownNow")
         ExecutorService executor = 
Executors.newFixedThreadPool(totalProducers);
 
+        @Cleanup
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 
.subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, 
TimeUnit.SECONDS)
                 
.maxPendingChunkedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true)
@@ -334,12 +335,11 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
             producers[i] = 
producerBuilder.enableChunking(true).enableBatching(false).create();
             int index = i;
             executor.submit(() -> {
-                
futures.add(producers[index].sendAsync(createMessagePayload(45).getBytes()));
+                
futures.add(producers[index].sendAsync(createMessagePayload(450).getBytes()));
             });
         }
 
         FutureUtil.waitForAll(futures).get();
-        PersistentTopic topic = (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get();
 
         Message<byte[]> msg = null;
         Set<String> messageSet = new HashSet<>();
@@ -354,8 +354,12 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
             consumer.acknowledge(msg);
         }
 
+        log.info("messageSet size: {}, totalPublishedMessages: {}", 
messageSet.size(), totalPublishedMessages);
         assertNotEquals(messageSet.size(), totalPublishedMessages);
 
+        for (int i = 0; i < totalProducers; i++) {
+            producers[i].close();
+        }
     }
 
     /**

Reply via email to