wenbingshen commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r825579419



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -606,6 +607,82 @@ public void testSendTimeout(int batchMessageDelayMs) 
throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "batch")
+    public void testSendTimeoutAndRecover(int batchMessageDelayMs) throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numPartitions = 6;
+        TopicName topicName = 
TopicName.get("persistent://my-property/my-ns/sendTimeoutAndRecover-1");
+        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+                .subscriptionName("my-subscriber-name").subscribe();
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+        if (batchMessageDelayMs != 0) {
+            producerBuilder.enableBatching(true);
+            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
+            producerBuilder.batchingMaxMessages(5);
+        }
+        PartitionedProducerImpl<byte[]> partitionedProducer =
+                (PartitionedProducerImpl<byte[]>) producerBuilder.create();
+        final String message = "my-message";
+
+        // 1. Trigger the send timeout
+        stopBroker();
+
+        partitionedProducer.sendAsync(message.getBytes());
+
+        String exceptionMessage = "";
+        try {
+            // 2. execute flush to get results,
+            // it should be failed because step 1
+            partitionedProducer.flush();
+            Assert.fail("Send operation should have failed");
+        } catch (PulsarClientException e) {
+            exceptionMessage = e.getMessage();
+        }
+
+        // 3. execute flush to get results,
+        // it shouldn't fail because we already handled the exception in the 
step 2, unless we keep sending data.
+        partitionedProducer.flush();
+        // 4. execute flushAsync, we only catch the exception once,
+        // but by getting the original lastSendFuture twice below,
+        // the same exception information must be caught twice to verify that 
our handleOnce works as expected.
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+
+        startBroker();
+
+        // 5. We should not have received any message
+        Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        Assert.assertNull(msg);
+
+        // 6. We keep sending data after connection reconnected.
+        partitionedProducer.sendAsync(message.getBytes());
+        // 7. This flush operation must succeed.
+        partitionedProducer.flush();

Review comment:
       Okay. I have closed the producer.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -606,6 +607,82 @@ public void testSendTimeout(int batchMessageDelayMs) 
throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "batch")
+    public void testSendTimeoutAndRecover(int batchMessageDelayMs) throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numPartitions = 6;
+        TopicName topicName = 
TopicName.get("persistent://my-property/my-ns/sendTimeoutAndRecover-1");
+        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+                .subscriptionName("my-subscriber-name").subscribe();
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+        if (batchMessageDelayMs != 0) {
+            producerBuilder.enableBatching(true);
+            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
+            producerBuilder.batchingMaxMessages(5);
+        }
+        PartitionedProducerImpl<byte[]> partitionedProducer =
+                (PartitionedProducerImpl<byte[]>) producerBuilder.create();
+        final String message = "my-message";
+
+        // 1. Trigger the send timeout
+        stopBroker();
+
+        partitionedProducer.sendAsync(message.getBytes());
+
+        String exceptionMessage = "";
+        try {
+            // 2. execute flush to get results,
+            // it should be failed because step 1
+            partitionedProducer.flush();
+            Assert.fail("Send operation should have failed");
+        } catch (PulsarClientException e) {
+            exceptionMessage = e.getMessage();
+        }
+
+        // 3. execute flush to get results,
+        // it shouldn't fail because we already handled the exception in the 
step 2, unless we keep sending data.
+        partitionedProducer.flush();
+        // 4. execute flushAsync, we only catch the exception once,
+        // but by getting the original lastSendFuture twice below,
+        // the same exception information must be caught twice to verify that 
our handleOnce works as expected.
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+
+        startBroker();
+
+        // 5. We should not have received any message
+        Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        Assert.assertNull(msg);
+
+        // 6. We keep sending data after connection reconnected.
+        partitionedProducer.sendAsync(message.getBytes());
+        // 7. This flush operation must succeed.
+        partitionedProducer.flush();

Review comment:
       Okay. I have closed the producer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to