codelipenghui commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r824569545



##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Producer.java
##########
@@ -199,4 +199,9 @@
      * @return the number of partitions per topic.
      */
     int getNumOfPartitions();
+
+    /**
+     * @return Get the result of the last sent data
+     */
+    CompletableFuture<Void> getOriginalLastSendFuture();

Review comment:
       Do we need to expose this method to the Producer API?  If it is 
necessary to add a new API, I think we'd better start with a proposal. From 
this PR, looks like we can make it internally, not a public API.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -606,6 +607,82 @@ public void testSendTimeout(int batchMessageDelayMs) 
throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "batch")
+    public void testSendTimeoutAndRecover(int batchMessageDelayMs) throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numPartitions = 6;
+        TopicName topicName = 
TopicName.get("persistent://my-property/my-ns/sendTimeoutAndRecover-1");
+        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+                .subscriptionName("my-subscriber-name").subscribe();
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+        if (batchMessageDelayMs != 0) {
+            producerBuilder.enableBatching(true);
+            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
+            producerBuilder.batchingMaxMessages(5);
+        }
+        PartitionedProducerImpl<byte[]> partitionedProducer =
+                (PartitionedProducerImpl<byte[]>) producerBuilder.create();
+        final String message = "my-message";
+
+        // 1. Trigger the send timeout
+        stopBroker();

Review comment:
       You can set the channel auto_read to false to simulate the send timeout, 
stop/start the broker might lead to the test run with longer time.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -935,6 +937,26 @@ protected WriteInEventLoopCallback 
newObject(Handle<WriteInEventLoopCallback> ha
         };
     }
 
+    private static final class LastSendFutureWrapper {
+        private final CompletableFuture<MessageId> lastSendFuture;
+        private final AtomicBoolean throwOnce = new AtomicBoolean(false);

Review comment:
       Use the AtomicLongFieldUpdater to reduce the AtomicBoolean objects.

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -606,6 +607,82 @@ public void testSendTimeout(int batchMessageDelayMs) 
throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "batch")
+    public void testSendTimeoutAndRecover(int batchMessageDelayMs) throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numPartitions = 6;
+        TopicName topicName = 
TopicName.get("persistent://my-property/my-ns/sendTimeoutAndRecover-1");
+        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+                .subscriptionName("my-subscriber-name").subscribe();
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+        if (batchMessageDelayMs != 0) {
+            producerBuilder.enableBatching(true);
+            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
+            producerBuilder.batchingMaxMessages(5);
+        }
+        PartitionedProducerImpl<byte[]> partitionedProducer =
+                (PartitionedProducerImpl<byte[]>) producerBuilder.create();
+        final String message = "my-message";
+
+        // 1. Trigger the send timeout
+        stopBroker();
+
+        partitionedProducer.sendAsync(message.getBytes());
+
+        String exceptionMessage = "";
+        try {
+            // 2. execute flush to get results,
+            // it should be failed because step 1
+            partitionedProducer.flush();
+            Assert.fail("Send operation should have failed");
+        } catch (PulsarClientException e) {
+            exceptionMessage = e.getMessage();
+        }
+
+        // 3. execute flush to get results,
+        // it shouldn't fail because we already handled the exception in the 
step 2, unless we keep sending data.
+        partitionedProducer.flush();
+        // 4. execute flushAsync, we only catch the exception once,
+        // but by getting the original lastSendFuture twice below,
+        // the same exception information must be caught twice to verify that 
our handleOnce works as expected.
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+
+        startBroker();
+
+        // 5. We should not have received any message
+        Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        Assert.assertNull(msg);
+
+        // 6. We keep sending data after connection reconnected.
+        partitionedProducer.sendAsync(message.getBytes());
+        // 7. This flush operation must succeed.
+        partitionedProducer.flush();

Review comment:
       The producer should be closed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to