This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c83d04f315ea0b43bf2f81b914f6ffaf407cba5b Author: wenbingshen <[email protected]> AuthorDate: Tue Mar 15 18:21:56 2022 +0800 Fix PartitionedProducerImpl flushAsync always fail when one partition send TimeOutException (#14602) Fixes #14598 Master Issue: #14598 Detailed issue description can be found at https://github.com/apache/pulsar/issues/14598 After the `lastSendFuture` returned to application from in `org.apache.pulsar.client.impl.ProducerImpl#flushAsync` acquired , it should not continue to be thrown to the application, the `lastSendFuture` whether an exception occurs or completed, and the application is only allowed to acquire it once, otherwise it will cause `org. apache.pulsar.client.impl.PartitionedProducerImpl#flushAsync ` cannot continue to send data until data is sent again to the abnormal `ProducerImpl`. (cherry picked from commit ddca8521ed30aaa719d64077b65c4a24fb82ce5c) --- .../client/api/SimpleProducerConsumerTest.java | 78 ++++++++++++++++++++++ .../client/impl/PartitionedProducerImpl.java | 6 ++ .../apache/pulsar/client/impl/ProducerImpl.java | 42 +++++++++++- 3 files changed, 123 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index fc238d1..ce5ba65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -88,6 +88,7 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.PartitionedProducerImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; @@ -607,6 +608,83 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + @Test(dataProvider = "batch") + public void testSendTimeoutAndRecover(int batchMessageDelayMs) throws Exception { + log.info("-- Starting {} test --", methodName); + + int numPartitions = 6; + TopicName topicName = TopicName.get("persistent://my-property/my-ns/sendTimeoutAndRecover-1"); + admin.topics().createPartitionedTopic(topicName.toString(), numPartitions); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) + .subscriptionName("my-subscriber-name").subscribe(); + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() + .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS); + + if (batchMessageDelayMs != 0) { + producerBuilder.enableBatching(true); + producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); + producerBuilder.batchingMaxMessages(5); + } + + @Cleanup + PartitionedProducerImpl<byte[]> partitionedProducer = + (PartitionedProducerImpl<byte[]>) producerBuilder.create(); + final String message = "my-message"; + // 1. Trigger the send timeout + stopBroker(); + + partitionedProducer.sendAsync(message.getBytes()); + + String exceptionMessage = ""; + try { + // 2. execute flush to get results, + // it should be failed because step 1 + partitionedProducer.flush(); + Assert.fail("Send operation should have failed"); + } catch (PulsarClientException e) { + exceptionMessage = e.getMessage(); + } + + // 3. execute flush to get results, + // it shouldn't fail because we already handled the exception in the step 2, unless we keep sending data. + partitionedProducer.flush(); + // 4. execute flushAsync, we only catch the exception once, + // but by getting the original lastSendFuture twice below, + // the same exception information must be caught twice to verify that our handleOnce works as expected. + try { + partitionedProducer.getOriginalLastSendFuture().get(); + Assert.fail("Send operation should have failed"); + } catch (Exception e) { + Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), exceptionMessage); + } + try { + partitionedProducer.getOriginalLastSendFuture().get(); + Assert.fail("Send operation should have failed"); + } catch (Exception e) { + Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(), exceptionMessage); + } + + startBroker(); + + // 5. We should not have received any message + Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + Assert.assertNull(msg); + + // 6. We keep sending data after connection reconnected. + partitionedProducer.sendAsync(message.getBytes()); + // 7. This flush operation must succeed. + partitionedProducer.flush(); + + // 8. We should have received message + msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(new String(msg.getData()), message); + + log.info("-- Exiting {} test --", methodName); + } + @Test public void testInvalidSequence() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 216d775..b120550 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -443,4 +443,10 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { return partitionsAutoUpdateTimeout; } + @VisibleForTesting + public CompletableFuture<Void> getOriginalLastSendFuture() { + return CompletableFuture.allOf( + producers.values().stream().map(ProducerImpl::getOriginalLastSendFuture) + .toArray(CompletableFuture[]::new)); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index dc651f3..2637c49 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -54,6 +54,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.Consumer; import org.apache.commons.lang3.StringUtils; @@ -113,6 +114,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private final BatchMessageContainerBase batchMessageContainer; private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null); + private LastSendFutureWrapper lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture); // Globally unique producer name private String producerName; @@ -883,6 +885,31 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne }; } + private static final class LastSendFutureWrapper { + private final CompletableFuture<MessageId> lastSendFuture; + private static final int FALSE = 0; + private static final int TRUE = 1; + private static final AtomicIntegerFieldUpdater<LastSendFutureWrapper> THROW_ONCE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(LastSendFutureWrapper.class, "throwOnce"); + private volatile int throwOnce = FALSE; + + private LastSendFutureWrapper(CompletableFuture<MessageId> lastSendFuture) { + this.lastSendFuture = lastSendFuture; + } + static LastSendFutureWrapper create(CompletableFuture<MessageId> lastSendFuture) { + return new LastSendFutureWrapper(lastSendFuture); + } + public CompletableFuture<Void> handleOnce() { + return lastSendFuture.handle((ignore, t) -> { + if (t != null && THROW_ONCE_UPDATER.compareAndSet(this, FALSE, TRUE)) { + throw FutureUtil.wrapToCompletionException(t); + } + return null; + }); + } + } + + @Override public CompletableFuture<Void> closeAsync() { final State currentState = getAndUpdateState(state -> { @@ -1840,14 +1867,17 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne @Override public CompletableFuture<Void> flushAsync() { - CompletableFuture<MessageId> lastSendFuture; synchronized (ProducerImpl.this) { if (isBatchMessagingEnabled()) { batchMessageAndSend(); } - lastSendFuture = this.lastSendFuture; + CompletableFuture<MessageId> lastSendFuture = this.lastSendFuture; + if (!(lastSendFuture == this.lastSendFutureWrapper.lastSendFuture)) { + this.lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture); + } } - return lastSendFuture.thenApply(ignored -> null); + + return this.lastSendFutureWrapper.handleOnce(); } @Override @@ -2055,5 +2085,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return errorState; } + @VisibleForTesting + CompletableFuture<Void> getOriginalLastSendFuture() { + CompletableFuture<MessageId> lastSendFuture = this.lastSendFuture; + return lastSendFuture.thenApply(ignore -> null); + } + private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class); }
