ivankelly commented on a change in pull request #2103: Issue 1433: Expose batch 
flushAsync() and flush() methods in Producer
URL: https://github.com/apache/incubator-pulsar/pull/2103#discussion_r200952616
 
 

 ##########
 File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 ##########
 @@ -2552,4 +2553,74 @@ public void testConsumerSubscriptionInitialize() throws 
Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testFlushBatchEnabled() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/test-flush-enabled")
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic("persistent://my-property/my-ns/test-flush-enabled")
+                .enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .batchingMaxMessages(10000);
+
+        @Cleanup
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.sendAsync(message.getBytes());
+        }
+        producer.flush();
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test
+    public void testFlushBatchDisabled() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/test-flush-disabled")
 
 Review comment:
   this is longer than 120. I know there's no checkstyle enforced, but we 
should still try to make stuff readable in github
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to