This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ddca852 Fix PartitionedProducerImpl flushAsync always fail when one
partition send TimeOutException (#14602)
ddca852 is described below
commit ddca8521ed30aaa719d64077b65c4a24fb82ce5c
Author: wenbingshen <[email protected]>
AuthorDate: Tue Mar 15 18:21:56 2022 +0800
Fix PartitionedProducerImpl flushAsync always fail when one partition send
TimeOutException (#14602)
Fixes #14598
Master Issue: #14598
### Motivation
Detailed issue description can be found at
https://github.com/apache/pulsar/issues/14598
### Modifications
After the `lastSendFuture` returned to application from in
`org.apache.pulsar.client.impl.ProducerImpl#flushAsync` acquired , it should
not continue to be thrown to the application, the `lastSendFuture` whether an
exception occurs or completed, and the application is only allowed to acquire
it once, otherwise it will cause `org.
apache.pulsar.client.impl.PartitionedProducerImpl#flushAsync ` cannot continue
to send data until data is sent again to the abnormal `ProducerImpl`.
---
.../client/api/SimpleProducerConsumerTest.java | 78 ++++++++++++++++++++++
.../client/impl/PartitionedProducerImpl.java | 7 ++
.../apache/pulsar/client/impl/ProducerImpl.java | 42 +++++++++++-
3 files changed, 124 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 368652c..9596cbf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
@@ -606,6 +607,83 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+ @Test(dataProvider = "batch")
+ public void testSendTimeoutAndRecover(int batchMessageDelayMs) throws
Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ int numPartitions = 6;
+ TopicName topicName =
TopicName.get("persistent://my-property/my-ns/sendTimeoutAndRecover-1");
+ admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+
+ @Cleanup
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+ .subscriptionName("my-subscriber-name").subscribe();
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+ .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS);
+
+ if (batchMessageDelayMs != 0) {
+ producerBuilder.enableBatching(true);
+ producerBuilder.batchingMaxPublishDelay(batchMessageDelayMs,
TimeUnit.MILLISECONDS);
+ producerBuilder.batchingMaxMessages(5);
+ }
+
+ @Cleanup
+ PartitionedProducerImpl<byte[]> partitionedProducer =
+ (PartitionedProducerImpl<byte[]>) producerBuilder.create();
+ final String message = "my-message";
+ // 1. Trigger the send timeout
+ stopBroker();
+
+ partitionedProducer.sendAsync(message.getBytes());
+
+ String exceptionMessage = "";
+ try {
+ // 2. execute flush to get results,
+ // it should be failed because step 1
+ partitionedProducer.flush();
+ Assert.fail("Send operation should have failed");
+ } catch (PulsarClientException e) {
+ exceptionMessage = e.getMessage();
+ }
+
+ // 3. execute flush to get results,
+ // it shouldn't fail because we already handled the exception in the
step 2, unless we keep sending data.
+ partitionedProducer.flush();
+ // 4. execute flushAsync, we only catch the exception once,
+ // but by getting the original lastSendFuture twice below,
+ // the same exception information must be caught twice to verify that
our handleOnce works as expected.
+ try {
+ partitionedProducer.getOriginalLastSendFuture().get();
+ Assert.fail("Send operation should have failed");
+ } catch (Exception e) {
+ Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(),
exceptionMessage);
+ }
+ try {
+ partitionedProducer.getOriginalLastSendFuture().get();
+ Assert.fail("Send operation should have failed");
+ } catch (Exception e) {
+ Assert.assertEquals(PulsarClientException.unwrap(e).getMessage(),
exceptionMessage);
+ }
+
+ startBroker();
+
+ // 5. We should not have received any message
+ Message<byte[]> msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ Assert.assertNull(msg);
+
+ // 6. We keep sending data after connection reconnected.
+ partitionedProducer.sendAsync(message.getBytes());
+ // 7. This flush operation must succeed.
+ partitionedProducer.flush();
+
+ // 8. We should have received message
+ msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg);
+ Assert.assertEquals(new String(msg.getData()), message);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
@Test
public void testInvalidSequence() throws Exception {
log.info("-- Starting {} test --", methodName);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index b9f64e5..47ce599 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -481,6 +481,13 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
return partitionsAutoUpdateTimeout;
}
+ @VisibleForTesting
+ public CompletableFuture<Void> getOriginalLastSendFuture() {
+ return CompletableFuture.allOf(
+
producers.values().stream().map(ProducerImpl::getOriginalLastSendFuture)
+ .toArray(CompletableFuture[]::new));
+ }
+
@Override
public int getNumOfPartitions() {
return topicMetadata.numPartitions();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a322c0a..d1976af 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -56,6 +56,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
@@ -117,6 +118,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
private final BatchMessageContainerBase batchMessageContainer;
private CompletableFuture<MessageId> lastSendFuture =
CompletableFuture.completedFuture(null);
+ private LastSendFutureWrapper lastSendFutureWrapper =
LastSendFutureWrapper.create(lastSendFuture);
// Globally unique producer name
private String producerName;
@@ -940,6 +942,31 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
};
}
+ private static final class LastSendFutureWrapper {
+ private final CompletableFuture<MessageId> lastSendFuture;
+ private static final int FALSE = 0;
+ private static final int TRUE = 1;
+ private static final AtomicIntegerFieldUpdater<LastSendFutureWrapper>
THROW_ONCE_UPDATER =
+
AtomicIntegerFieldUpdater.newUpdater(LastSendFutureWrapper.class, "throwOnce");
+ private volatile int throwOnce = FALSE;
+
+ private LastSendFutureWrapper(CompletableFuture<MessageId>
lastSendFuture) {
+ this.lastSendFuture = lastSendFuture;
+ }
+ static LastSendFutureWrapper create(CompletableFuture<MessageId>
lastSendFuture) {
+ return new LastSendFutureWrapper(lastSendFuture);
+ }
+ public CompletableFuture<Void> handleOnce() {
+ return lastSendFuture.handle((ignore, t) -> {
+ if (t != null && THROW_ONCE_UPDATER.compareAndSet(this, FALSE,
TRUE)) {
+ throw FutureUtil.wrapToCompletionException(t);
+ }
+ return null;
+ });
+ }
+ }
+
+
@Override
public CompletableFuture<Void> closeAsync() {
final State currentState = getAndUpdateState(state -> {
@@ -1976,14 +2003,17 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
@Override
public CompletableFuture<Void> flushAsync() {
- CompletableFuture<MessageId> lastSendFuture;
synchronized (ProducerImpl.this) {
if (isBatchMessagingEnabled()) {
batchMessageAndSend();
}
- lastSendFuture = this.lastSendFuture;
+ CompletableFuture<MessageId> lastSendFuture = this.lastSendFuture;
+ if (!(lastSendFuture ==
this.lastSendFutureWrapper.lastSendFuture)) {
+ this.lastSendFutureWrapper =
LastSendFutureWrapper.create(lastSendFuture);
+ }
}
- return lastSendFuture.thenApply(ignored -> null);
+
+ return this.lastSendFutureWrapper.handleOnce();
}
@Override
@@ -2215,5 +2245,11 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return errorState;
}
+ @VisibleForTesting
+ CompletableFuture<Void> getOriginalLastSendFuture() {
+ CompletableFuture<MessageId> lastSendFuture = this.lastSendFuture;
+ return lastSendFuture.thenApply(ignore -> null);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ProducerImpl.class);
}
\ No newline at end of file