This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 180e5f26e4afdfda178e7253fb22a8b21b2c7f54 Author: feynmanlin <[email protected]> AuthorDate: Tue Jul 28 14:02:34 2020 +0800 fix batchReceiveAsync not completed exceptionally when closing Consumer (#7661) ### Motivation CompletableFuture<Messages<T>> from Consumer.batchReceiveAsync() not completed exceptionnally when closing Consumer. ### Modifications pendingBatchReceives was not cleaned up when the connection was closed, so I added pendingBatchReceives cleanup. (cherry picked from commit 48156ad9a5c2e0d85813367bcaaf6ea845fffc2c) --- .../client/api/SimpleProducerConsumerTest.java | 74 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 26 +++++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 34 +++------- .../client/impl/MultiTopicsConsumerImpl.java | 11 +--- 4 files changed, 111 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index dce6c10..669e259 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -3352,4 +3352,78 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } log.info("-- Exiting {} test --", methodName); } + + @Test(timeOut = 5000) + public void testReceiveAsyncCompletedWhenClosing() throws Exception { + final String topic = "persistent://my-property/my-ns/testCompletedWhenClosing"; + final String partitionedTopic = "persistent://my-property/my-ns/testCompletedWhenClosing-partitioned"; + final String errorMsg = "cleaning and closing the consumers"; + BatchReceivePolicy batchReceivePolicy + = BatchReceivePolicy.builder().maxNumBytes(10 * 1024).maxNumMessages(10).timeout(-1, TimeUnit.SECONDS).build(); + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic).subscriptionName("my-subscriber-name") + .batchReceivePolicy(batchReceivePolicy).subscribe(); + // 1) Test receiveAsync is interrupted + CountDownLatch countDownLatch = new CountDownLatch(1); + new Thread(() -> { + try { + consumer.receiveAsync().get(); + Assert.fail("should be interrupted"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains(errorMsg)); + countDownLatch.countDown(); + } + }).start(); + new Thread(() -> { + try { + consumer.close(); + } catch (PulsarClientException ignore) { + } + }).start(); + countDownLatch.await(); + + // 2) Test batchReceiveAsync is interrupted + CountDownLatch countDownLatch2 = new CountDownLatch(1); + Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING) + .topic(topic).subscriptionName("my-subscriber-name") + .batchReceivePolicy(batchReceivePolicy).subscribe(); + new Thread(() -> { + try { + consumer2.batchReceiveAsync().get(); + Assert.fail("should be interrupted"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains(errorMsg)); + countDownLatch2.countDown(); + } + }).start(); + new Thread(() -> { + try { + consumer2.close(); + } catch (PulsarClientException ignore) { + } + }).start(); + countDownLatch2.await(); + // 3) Test partitioned topic batchReceiveAsync is interrupted + CountDownLatch countDownLatch3 = new CountDownLatch(1); + admin.topics().createPartitionedTopic(partitionedTopic, 3); + Consumer<String> partitionedTopicConsumer = pulsarClient.newConsumer(Schema.STRING) + .topic(partitionedTopic).subscriptionName("my-subscriber-name-partitionedTopic") + .batchReceivePolicy(batchReceivePolicy).subscribe(); + new Thread(() -> { + try { + partitionedTopicConsumer.batchReceiveAsync().get(); + Assert.fail("should be interrupted"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains(errorMsg)); + countDownLatch3.countDown(); + } + }).start(); + new Thread(() -> { + try { + partitionedTopicConsumer.close(); + } catch (PulsarClientException ignore) { + } + }).start(); + countDownLatch3.await(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 3d45931..5391cb6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -195,6 +195,30 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T } } + protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) { + while (!pendingReceives.isEmpty()) { + CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll(); + if (receiveFuture == null) { + break; + } + receiveFuture.completeExceptionally( + new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " + + "was already closed when cleaning and closing the consumers", topic, subscription))); + } + } + + protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) { + while (!pendingBatchReceives.isEmpty()) { + OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll(); + if (opBatchReceive == null || opBatchReceive.future == null) { + break; + } + opBatchReceive.future.completeExceptionally( + new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " + + "was already closed when cleaning and closing the consumers", topic, subscription))); + } + } + abstract protected Messages<T> internalBatchReceive() throws PulsarClientException; abstract protected CompletableFuture<Messages<T>> internalBatchReceiveAsync(); @@ -405,7 +429,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T TransactionImpl txn); protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, - Map<String,Long> properties, + Map<String,Long> properties, long delayTime, TimeUnit unit); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1757f1e..10320da 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -25,7 +25,6 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Queues; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -40,8 +39,6 @@ import java.util.BitSet; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -84,7 +81,6 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.RetryMessageUtil; -import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; @@ -172,9 +168,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private volatile Producer<T> retryLetterProducer; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); - + protected volatile boolean paused; - + protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>(); private int pendingChunckedMessageCount = 0; protected long expireTimeOfIncompleteChunkedMessageMillis = 0; @@ -560,7 +556,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle @SuppressWarnings("unchecked") @Override protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, - Map<String,Long> properties, + Map<String,Long> properties, long delayTime, TimeUnit unit) { MessageId messageId = message.getMessageId(); @@ -620,7 +616,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) { reconsumetimes = Integer.valueOf(propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)); reconsumetimes = reconsumetimes + 1; - + } else { propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr); @@ -628,7 +624,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes)); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime))); - + if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) { processPossibleToDLQ((MessageIdImpl)messageId); if (deadLetterProducer == null) { @@ -996,18 +992,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle lock.readLock().lock(); try { if (listenerExecutor != null && !listenerExecutor.isShutdown()) { - while (!pendingReceives.isEmpty()) { - CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll(); - if (receiveFuture != null) { - receiveFuture.completeExceptionally( - new PulsarClientException.AlreadyClosedException( - String.format("The consumer which subscribes the topic %s with subscription name %s " + - "was already closed when cleaning and closing the consumers", - topicName.toString(), subscription))); - } else { - break; - } - } + failPendingReceives(this.pendingReceives); + failPendingBatchReceives(this.pendingBatchReceives); } } finally { lock.readLock().unlock(); @@ -1083,7 +1069,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { - + // right now, chunked messages are only supported by non-shared subscription if (isChunkedMessage) { uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx); @@ -1152,7 +1138,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle TimeUnit.MILLISECONDS); expireChunkMessageTaskScheduled = true; } - + if (msgMetadata.getChunkId() == 0) { ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(), msgMetadata.getTotalChunkMsgSize()); @@ -1222,7 +1208,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle compressedPayload.release(); return uncompressedPayload; } - + protected void triggerListener(int numMessages) { // Trigger the notification on the message listener in a separate thread to avoid blocking the networking // thread while the message processing happens diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 7f44836..50e4db9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -566,15 +566,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { lock.readLock().lock(); try { if (listenerExecutor != null && !listenerExecutor.isShutdown()) { - while (!pendingReceives.isEmpty()) { - CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll(); - if (receiveFuture != null) { - receiveFuture.completeExceptionally( - new PulsarClientException.AlreadyClosedException("Consumer is already closed")); - } else { - break; - } - } + failPendingReceives(pendingReceives); + failPendingBatchReceives(pendingBatchReceives); } } finally { lock.readLock().unlock();
