This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 48156ad fix batchReceiveAsync not completed exceptionally when
closing Consumer (#7661)
48156ad is described below
commit 48156ad9a5c2e0d85813367bcaaf6ea845fffc2c
Author: feynmanlin <[email protected]>
AuthorDate: Tue Jul 28 14:02:34 2020 +0800
fix batchReceiveAsync not completed exceptionally when closing Consumer
(#7661)
### Motivation
CompletableFuture<Messages<T>> from Consumer.batchReceiveAsync() not
completed exceptionnally when closing Consumer.
### Modifications
pendingBatchReceives was not cleaned up when the connection was closed, so
I added pendingBatchReceives cleanup.
---
.../client/api/SimpleProducerConsumerTest.java | 74 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 26 +++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 34 +++-------
.../client/impl/MultiTopicsConsumerImpl.java | 11 +---
4 files changed, 111 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index dce6c10..669e259 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -3352,4 +3352,78 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
}
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(timeOut = 5000)
+ public void testReceiveAsyncCompletedWhenClosing() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/testCompletedWhenClosing";
+ final String partitionedTopic =
"persistent://my-property/my-ns/testCompletedWhenClosing-partitioned";
+ final String errorMsg = "cleaning and closing the consumers";
+ BatchReceivePolicy batchReceivePolicy
+ = BatchReceivePolicy.builder().maxNumBytes(10 *
1024).maxNumMessages(10).timeout(-1, TimeUnit.SECONDS).build();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName("my-subscriber-name")
+ .batchReceivePolicy(batchReceivePolicy).subscribe();
+ // 1) Test receiveAsync is interrupted
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ consumer.receiveAsync().get();
+ Assert.fail("should be interrupted");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(errorMsg));
+ countDownLatch.countDown();
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ consumer.close();
+ } catch (PulsarClientException ignore) {
+ }
+ }).start();
+ countDownLatch.await();
+
+ // 2) Test batchReceiveAsync is interrupted
+ CountDownLatch countDownLatch2 = new CountDownLatch(1);
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName("my-subscriber-name")
+ .batchReceivePolicy(batchReceivePolicy).subscribe();
+ new Thread(() -> {
+ try {
+ consumer2.batchReceiveAsync().get();
+ Assert.fail("should be interrupted");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(errorMsg));
+ countDownLatch2.countDown();
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ consumer2.close();
+ } catch (PulsarClientException ignore) {
+ }
+ }).start();
+ countDownLatch2.await();
+ // 3) Test partitioned topic batchReceiveAsync is interrupted
+ CountDownLatch countDownLatch3 = new CountDownLatch(1);
+ admin.topics().createPartitionedTopic(partitionedTopic, 3);
+ Consumer<String> partitionedTopicConsumer =
pulsarClient.newConsumer(Schema.STRING)
+
.topic(partitionedTopic).subscriptionName("my-subscriber-name-partitionedTopic")
+ .batchReceivePolicy(batchReceivePolicy).subscribe();
+ new Thread(() -> {
+ try {
+ partitionedTopicConsumer.batchReceiveAsync().get();
+ Assert.fail("should be interrupted");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(errorMsg));
+ countDownLatch3.countDown();
+ }
+ }).start();
+ new Thread(() -> {
+ try {
+ partitionedTopicConsumer.close();
+ } catch (PulsarClientException ignore) {
+ }
+ }).start();
+ countDownLatch3.await();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index cdc3f5f..0b329e6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -195,6 +195,30 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
}
+ protected void
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives) {
+ while (!pendingReceives.isEmpty()) {
+ CompletableFuture<Message<T>> receiveFuture =
pendingReceives.poll();
+ if (receiveFuture == null) {
+ break;
+ }
+ receiveFuture.completeExceptionally(
+ new
PulsarClientException.AlreadyClosedException(String.format("The consumer which
subscribes the topic %s with subscription name %s " +
+ "was already closed when cleaning and closing the
consumers", topic, subscription)));
+ }
+ }
+
+ protected void
failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>>
pendingBatchReceives) {
+ while (!pendingBatchReceives.isEmpty()) {
+ OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+ if (opBatchReceive == null || opBatchReceive.future == null) {
+ break;
+ }
+ opBatchReceive.future.completeExceptionally(
+ new
PulsarClientException.AlreadyClosedException(String.format("The consumer which
subscribes the topic %s with subscription name %s " +
+ "was already closed when cleaning and closing the
consumers", topic, subscription)));
+ }
+ }
+
abstract protected Messages<T> internalBatchReceive() throws
PulsarClientException;
abstract protected CompletableFuture<Messages<T>>
internalBatchReceiveAsync();
@@ -405,7 +429,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
TransactionImpl
txn);
protected abstract CompletableFuture<Void> doReconsumeLater(Message<?>
message, AckType ackType,
-
Map<String,Long> properties,
+
Map<String,Long> properties,
long delayTime,
TimeUnit unit);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 76e0726..148cf84 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,7 +25,6 @@ import static
org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -40,8 +39,6 @@ import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -84,7 +81,6 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
-import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
@@ -173,9 +169,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private volatile Producer<T> retryLetterProducer;
private final ReadWriteLock createProducerLock = new
ReentrantReadWriteLock();
-
+
protected volatile boolean paused;
-
+
protected ConcurrentOpenHashMap<String, ChunkedMessageCtx>
chunkedMessagesMap = new ConcurrentOpenHashMap<>();
private int pendingChunckedMessageCount = 0;
protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
@@ -561,7 +557,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@SuppressWarnings("unchecked")
@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message,
AckType ackType,
- Map<String,Long>
properties,
+ Map<String,Long>
properties,
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
@@ -621,7 +617,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if
(propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
reconsumetimes =
Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumetimes = reconsumetimes + 1;
-
+
} else {
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC,
originTopicNameStr);
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID,
originMessageIdStr);
@@ -629,7 +625,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES,
String.valueOf(reconsumetimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
String.valueOf(unit.toMillis(delayTime)));
-
+
if (reconsumetimes >
this.deadLetterPolicy.getMaxRedeliverCount()) {
processPossibleToDLQ((MessageIdImpl)messageId);
if (deadLetterProducer == null) {
@@ -997,18 +993,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
- while (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receiveFuture =
pendingReceives.poll();
- if (receiveFuture != null) {
- receiveFuture.completeExceptionally(
- new PulsarClientException.AlreadyClosedException(
- String.format("The consumer which subscribes
the topic %s with subscription name %s " +
- "was already closed when cleaning and
closing the consumers",
- topicName.toString(), subscription)));
- } else {
- break;
- }
- }
+ failPendingReceives(this.pendingReceives);
+ failPendingBatchReceives(this.pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
@@ -1084,7 +1070,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// if message is not decryptable then it can't be parsed as a
batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 &&
!msgMetadata.hasNumMessagesInBatch())) {
-
+
// right now, chunked messages are only supported by non-shared
subscription
if (isChunkedMessage) {
uncompressedPayload = processMessageChunk(uncompressedPayload,
msgMetadata, msgId, messageId, cnx);
@@ -1153,7 +1139,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
TimeUnit.MILLISECONDS);
expireChunkMessageTaskScheduled = true;
}
-
+
if (msgMetadata.getChunkId() == 0) {
ByteBuf chunkedMsgBuffer =
Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
msgMetadata.getTotalChunkMsgSize());
@@ -1223,7 +1209,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
compressedPayload.release();
return uncompressedPayload;
}
-
+
protected void triggerListener(int numMessages) {
// Trigger the notification on the message listener in a separate
thread to avoid blocking the networking
// thread while the message processing happens
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 7529ea5..4b9e870 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -566,15 +566,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
lock.readLock().lock();
try {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
- while (!pendingReceives.isEmpty()) {
- CompletableFuture<Message<T>> receiveFuture =
pendingReceives.poll();
- if (receiveFuture != null) {
- receiveFuture.completeExceptionally(
- new
PulsarClientException.AlreadyClosedException("Consumer is already closed"));
- } else {
- break;
- }
- }
+ failPendingReceives(pendingReceives);
+ failPendingBatchReceives(pendingBatchReceives);
}
} finally {
lock.readLock().unlock();