This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c83d04f315ea0b43bf2f81b914f6ffaf407cba5b
Author: wenbingshen <[email protected]>
AuthorDate: Tue Mar 15 18:21:56 2022 +0800

    Fix PartitionedProducerImpl flushAsync always fail when one partition send 
TimeOutException (#14602)
    
    Fixes #14598
    
    Master Issue: #14598
    
    Detailed issue description can be found at 
https://github.com/apache/pulsar/issues/14598
    
    After the `lastSendFuture` returned to application from in 
`org.apache.pulsar.client.impl.ProducerImpl#flushAsync` acquired , it should 
not continue to be thrown to the application, the `lastSendFuture` whether an 
exception occurs or completed, and the application is only allowed to acquire 
it once, otherwise it will cause `org. 
apache.pulsar.client.impl.PartitionedProducerImpl#flushAsync ` cannot continue 
to send data until data is sent again to the abnormal `ProducerImpl`.
    
    (cherry picked from commit ddca8521ed30aaa719d64077b65c4a24fb82ce5c)
---
 .../client/api/SimpleProducerConsumerTest.java     | 78 ++++++++++++++++++++++
 .../client/impl/PartitionedProducerImpl.java       |  6 ++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 42 +++++++++++-
 3 files changed, 123 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index fc238d1..ce5ba65 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -88,6 +88,7 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -607,6 +608,83 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(dataProvider = "batch")
+    public void testSendTimeoutAndRecover(int batchMessageDelayMs) throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        int numPartitions = 6;
+        TopicName topicName = 
TopicName.get("persistent://my-property/my-ns/sendTimeoutAndRecover-1");
+        admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+
+        @Cleanup
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName.toString())
+                .subscriptionName("my-subscriber-name").subscribe();
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+                .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+        if (batchMessageDelayMs != 0) {
+            producerBuilder.enableBatching(true);
+            producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, 
TimeUnit.MILLISECONDS);
+            producerBuilder.batchingMaxMessages(5);
+        }
+
+        @Cleanup
+        PartitionedProducerImpl<byte[]> partitionedProducer =
+                (PartitionedProducerImpl<byte[]>) producerBuilder.create();
+        final String message = "my-message";
+        // 1. Trigger the send timeout
+        stopBroker();
+
+        partitionedProducer.sendAsync(message.getBytes());
+
+        String exceptionMessage = "";
+        try {
+            // 2. execute flush to get results,
+            // it should be failed because step 1
+            partitionedProducer.flush();
+            Assert.fail("Send operation should have failed");
+        } catch (PulsarClientException e) {
+            exceptionMessage = e.getMessage();
+        }
+
+        // 3. execute flush to get results,
+        // it shouldn't fail because we already handled the exception in the 
step 2, unless we keep sending data.
+        partitionedProducer.flush();
+        // 4. execute flushAsync, we only catch the exception once,
+        // but by getting the original lastSendFuture twice below,
+        // the same exception information must be caught twice to verify that 
our handleOnce works as expected.
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+        try {
+            partitionedProducer.getOriginalLastSendFuture().get();
+            Assert.fail("Send operation should have failed");
+        } catch (Exception e) {
+            Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), 
exceptionMessage);
+        }
+
+        startBroker();
+
+        // 5. We should not have received any message
+        Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        Assert.assertNull(msg);
+
+        // 6. We keep sending data after connection reconnected.
+        partitionedProducer.sendAsync(message.getBytes());
+        // 7. This flush operation must succeed.
+        partitionedProducer.flush();
+
+        // 8. We should have received message
+        msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg);
+        Assert.assertEquals(new String(msg.getData()), message);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
     @Test
     public void testInvalidSequence() throws Exception {
         log.info("-- Starting {} test --", methodName);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 216d775..b120550 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -443,4 +443,10 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
         return partitionsAutoUpdateTimeout;
     }
 
+    @VisibleForTesting
+    public CompletableFuture<Void> getOriginalLastSendFuture() {
+        return CompletableFuture.allOf(
+                
producers.values().stream().map(ProducerImpl::getOriginalLastSendFuture)
+                        .toArray(CompletableFuture[]::new));
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index dc651f3..2637c49 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -54,6 +54,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.Consumer;
 import org.apache.commons.lang3.StringUtils;
@@ -113,6 +114,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = 
CompletableFuture.completedFuture(null);
+    private LastSendFutureWrapper lastSendFutureWrapper = 
LastSendFutureWrapper.create(lastSendFuture);
 
     // Globally unique producer name
     private String producerName;
@@ -883,6 +885,31 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         };
     }
 
+    private static final class LastSendFutureWrapper {
+        private final CompletableFuture<MessageId> lastSendFuture;
+        private static final int FALSE = 0;
+        private static final int TRUE = 1;
+        private static final AtomicIntegerFieldUpdater<LastSendFutureWrapper> 
THROW_ONCE_UPDATER =
+                
AtomicIntegerFieldUpdater.newUpdater(LastSendFutureWrapper.class, "throwOnce");
+        private volatile int throwOnce = FALSE;
+
+        private LastSendFutureWrapper(CompletableFuture<MessageId> 
lastSendFuture) {
+            this.lastSendFuture = lastSendFuture;
+        }
+        static LastSendFutureWrapper create(CompletableFuture<MessageId> 
lastSendFuture) {
+            return new LastSendFutureWrapper(lastSendFuture);
+        }
+        public CompletableFuture<Void> handleOnce() {
+            return lastSendFuture.handle((ignore, t) -> {
+                if (t != null && THROW_ONCE_UPDATER.compareAndSet(this, FALSE, 
TRUE)) {
+                    throw FutureUtil.wrapToCompletionException(t);
+                }
+                return null;
+            });
+        }
+    }
+
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         final State currentState = getAndUpdateState(state -> {
@@ -1840,14 +1867,17 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     @Override
     public CompletableFuture<Void> flushAsync() {
-        CompletableFuture<MessageId> lastSendFuture;
         synchronized (ProducerImpl.this) {
             if (isBatchMessagingEnabled()) {
                 batchMessageAndSend();
             }
-            lastSendFuture = this.lastSendFuture;
+            CompletableFuture<MessageId>  lastSendFuture = this.lastSendFuture;
+            if (!(lastSendFuture == 
this.lastSendFutureWrapper.lastSendFuture)) {
+                this.lastSendFutureWrapper = 
LastSendFutureWrapper.create(lastSendFuture);
+            }
         }
-        return lastSendFuture.thenApply(ignored -> null);
+
+        return this.lastSendFutureWrapper.handleOnce();
     }
 
     @Override
@@ -2055,5 +2085,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return errorState;
     }
 
+    @VisibleForTesting
+    CompletableFuture<Void> getOriginalLastSendFuture() {
+        CompletableFuture<MessageId> lastSendFuture = this.lastSendFuture;
+        return lastSendFuture.thenApply(ignore -> null);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProducerImpl.class);
 }

Reply via email to