This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11d4d07  [Issue 8300][Java client] Support cancelling message & batch 
futures returned from Reader & Consumer (#8326)
11d4d07 is described below

commit 11d4d077f5bb009297b32e19b3eb1ace3660d0c5
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Oct 26 10:13:15 2020 +0200

    [Issue 8300][Java client] Support cancelling message & batch futures 
returned from Reader & Consumer (#8326)
    
    Fixes #8300, #8307
    
    ### Motivation
    
    Explained in #8300
    
    ### Modifications
    
    The futures returned by 
`org.apache.pulsar.client.api.Reader.readNextAsync`, 
`org.apache.pulsar.client.api.Consumer.receiveAsync` and 
`org.apache.pulsar.client.api.Consumer.batchReceiveAsync` methods
    return a `CompletableFuture` that contains a "whenComplete" callback which 
handles `CancellationException & TimeoutException` . The callback action will 
remove the registered futures from the internal "pendingReceives" and 
"pendingBatchReceives" futures.
    The changes attempt to also against possible race conditions that could 
happen when cancellation and a message get received at the exactly same moment 
and the future removed from the queue has already been cancelled.
    
    While adding unit tests, some minor refactoring was made to start adding a 
ClientTestFixtures class that would make it easier to share some test mocks 
that are used in the client internal unit tests. The intention of this change 
was to start lowering the barrier for adding better test coverage for further 
changes.
---
 .../org/apache/pulsar/client/api/Consumer.java     |  14 +++
 .../java/org/apache/pulsar/client/api/Reader.java  |   9 ++
 .../impl/CompletableFutureCancellationHandler.java | 106 ++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 120 ++++++++++++++++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  22 ++--
 .../client/impl/MultiTopicsConsumerImpl.java       |  15 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   9 +-
 .../pulsar/client/impl/ClientTestFixtures.java     |  78 ++++++++++++++
 .../CompletableFutureCancellationHandlerTest.java  |  56 ++++++++++
 .../pulsar/client/impl/ConsumerImplTest.java       |  59 ++++++----
 .../client/impl/MultiTopicsConsumerImplTest.java   |  92 ++++++++--------
 .../apache/pulsar/client/impl/ReaderImplTest.java  |  57 ++++++++++
 12 files changed, 533 insertions(+), 104 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 801896c..4827a8e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -88,6 +88,12 @@ public interface Consumer<T> extends Closeable {
      * <p>{@code receiveAsync()} should be called subsequently once returned 
{@code CompletableFuture} gets complete
      * with received message. Else it creates <i> backlog of receive requests 
</i> in the application.
      *
+     * <p>The returned future can be cancelled before completion by calling 
{@code .cancel(false)}
+     * ({@link CompletableFuture#cancel(boolean)}) to remove it from the the 
backlog of receive requests. Another
+     * choice for ensuring a proper clean up of the returned future is to use 
the CompletableFuture.orTimeout method
+     * which is available on JDK9+. That would remove it from the backlog of 
receive requests if receiving exceeds
+     * the timeout.
+     *
      * @return {@link CompletableFuture}<{@link Message}> will be completed 
when message is available
      */
     CompletableFuture<Message<T>> receiveAsync();
@@ -129,6 +135,14 @@ public interface Consumer<T> extends Closeable {
      * {@code batchReceiveAsync()} should be called subsequently once returned 
{@code CompletableFuture} gets complete
      * with received messages. Else it creates <i> backlog of receive requests 
</i> in the application.
      * </p>
+     *
+     * <p>The returned future can be cancelled before completion by calling 
{@code .cancel(false)}
+     * ({@link CompletableFuture#cancel(boolean)}) to remove it from the the 
backlog of receive requests. Another
+     * choice for ensuring a proper clean up of the returned future is to use 
the CompletableFuture.orTimeout method
+     * which is available on JDK9+. That would remove it from the backlog of 
receive requests if receiving exceeds
+     * the timeout.
+     *
+     *
      * @return messages
      * @since 2.4.1
      * @throws PulsarClientException
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
index b7cb96a..492a1cf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -55,6 +55,15 @@ public interface Reader<T> extends Closeable {
     /**
      * Read asynchronously the next message in the topic.
      *
+     * <p>{@code readNextAsync()} should be called subsequently once returned 
{@code CompletableFuture} gets complete
+     * with received message. Else it creates <i> backlog of receive requests 
</i> in the application.
+     *
+     * <p>The returned future can be cancelled before completion by calling 
{@code .cancel(false)}
+     * ({@link CompletableFuture#cancel(boolean)}) to remove it from the the 
backlog of receive requests. Another
+     * choice for ensuring a proper clean up of the returned future is to use 
the CompletableFuture.orTimeout method
+     * which is available on JDK9+. That would remove it from the backlog of 
receive requests if receiving exceeds
+     * the timeout.
+     *
      * @return a future that will yield a message (when it's available) or 
{@link PulsarClientException} if the reader
      *         is already closed.
      */
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
new file mode 100644
index 0000000..1deb387
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+/**
+ * Implements cancellation and timeout support for CompletableFutures.
+ * <p>
+ * This class ensures that the cancel action gets called once after the future 
completes with
+ * either {@link CancellationException} or {@link TimeoutException}.
+ * The implementation handles possible race conditions that
+ * might happen when the future gets cancelled before the cancel action is set 
to this handler.
+ * <p>
+ * For timeouts, CompletableFuture's "orTimeout" method introduced in JDK9
+ * can be used in client code.
+ * <p>
+ * Cancellation and timeout support will only be active on the future where the
+ * cancellation handler has been attached to. Cancellation won't happen if 
.cancel is called on
+ * any "downstream" dependent futures. A cancellation or timeout that happens 
in any "upstream"
+ * future will get handled.
+ */
+class CompletableFutureCancellationHandler {
+    private volatile boolean cancelled;
+    private volatile Runnable cancelAction;
+    private final AtomicBoolean cancelHandled = new AtomicBoolean();
+
+    /**
+     * Creates a new {@link CompletableFuture} and attaches the cancellation 
handler
+     * to handle cancels and timeouts.
+     *
+     * @param <T> the result type of the future
+     * @return a new future instance
+     */
+    public <T> CompletableFuture<T> createFuture() {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        attachToFuture(future);
+        return future;
+    }
+
+    /**
+     * Attaches the cancellation handler to handle cancels
+     * and timeouts
+     *
+     * @param future the future to attach the handler to
+     * @param <T>    the result type of the future
+     */
+    public <T> void attachToFuture(CompletableFuture<T> future) {
+        future.whenComplete(whenCompleteFunction());
+    }
+
+    /**
+     * Set the action to run when the future gets cancelled or timeouts.
+     * The cancellation or timeout might be originating from any "upstream" 
future.
+     * The implementation ensures that the cancel action gets called once.
+     * Handles possible race conditions that might happen when the future gets 
cancelled
+     * before the cancel action is set to this handler. In this case, the
+     * cancel action gets called when the action is set.
+     *
+     * @param cancelAction the action to run when the the future gets 
cancelled or timeouts
+     */
+    public void setCancelAction(Runnable cancelAction) {
+        if (this.cancelAction != null) {
+            throw new IllegalStateException("cancelAction can only be set 
once.");
+        }
+        this.cancelAction = Objects.requireNonNull(cancelAction);
+        // handle race condition in the case that the future was already 
cancelled when the handler is set
+        runCancelActionOnceIfCancelled();
+    }
+
+    private <T> BiConsumer<? super T, ? super Throwable> 
whenCompleteFunction() {
+        return (T t, Throwable throwable) -> {
+            if (throwable instanceof CancellationException || throwable 
instanceof TimeoutException) {
+                cancelled = true;
+            }
+            runCancelActionOnceIfCancelled();
+        };
+    }
+
+    private void runCancelActionOnceIfCancelled() {
+        if (cancelled && cancelAction != null && 
cancelHandled.compareAndSet(false, true)) {
+            cancelAction.run();
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 8c67f2c..1c40449 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -199,15 +199,57 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         }
     }
 
+    protected CompletableFuture<Message<T>> peekPendingReceive() {
+        CompletableFuture<Message<T>> receivedFuture = null;
+        while (receivedFuture == null) {
+            receivedFuture = pendingReceives.peek();
+            if (receivedFuture == null) {
+                break;
+            }
+            // skip done futures (cancelling a future could mark it done)
+            if (receivedFuture.isDone()) {
+                CompletableFuture<Message<T>> removed = pendingReceives.poll();
+                if (removed != receivedFuture) {
+                    log.error("Bug! Removed future wasn't the expected one. 
expected={} removed={}", receivedFuture, removed);
+                }
+                receivedFuture = null;
+            }
+        }
+        return receivedFuture;
+    }
+
+    protected CompletableFuture<Message<T>> pollPendingReceive() {
+        CompletableFuture<Message<T>> receivedFuture;
+        while (true) {
+            receivedFuture = pendingReceives.poll();
+            // skip done futures (cancelling a future could mark it done)
+            if (receivedFuture == null || !receivedFuture.isDone()) {
+                break;
+            }
+        }
+        return receivedFuture;
+    }
+
+    protected void completePendingReceive(CompletableFuture<Message<T>> 
receivedFuture, Message<T> message) {
+        listenerExecutor.execute(() -> {
+            if (!receivedFuture.complete(message)) {
+                log.warn("Race condition detected. receive future was already 
completed (cancelled={}) and message was dropped. message={}",
+                        receivedFuture.isCancelled(), message);
+            }
+        });
+    }
+
     protected void 
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives) {
         while (!pendingReceives.isEmpty()) {
             CompletableFuture<Message<T>> receiveFuture = 
pendingReceives.poll();
             if (receiveFuture == null) {
                 break;
             }
-            receiveFuture.completeExceptionally(
-                    new 
PulsarClientException.AlreadyClosedException(String.format("The consumer which 
subscribes the topic %s with subscription name %s " +
-                            "was already closed when cleaning and closing the 
consumers", topic, subscription)));
+            if (!receiveFuture.isDone()) {
+                receiveFuture.completeExceptionally(
+                        new 
PulsarClientException.AlreadyClosedException(String.format("The consumer which 
subscribes the topic %s with subscription name %s " +
+                                "was already closed when cleaning and closing 
the consumers", topic, subscription)));
+            }
         }
     }
 
@@ -217,9 +259,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             if (opBatchReceive == null || opBatchReceive.future == null) {
                 break;
             }
-            opBatchReceive.future.completeExceptionally(
-                    new 
PulsarClientException.AlreadyClosedException(String.format("The consumer which 
subscribes the topic %s with subscription name %s " +
-                            "was already closed when cleaning and closing the 
consumers", topic, subscription)));
+            if (!opBatchReceive.future.isDone()) {
+                opBatchReceive.future.completeExceptionally(
+                        new 
PulsarClientException.AlreadyClosedException(String.format("The consumer which 
subscribes the topic %s with subscription name %s " +
+                                "was already closed when cleaning and closing 
the consumers", topic, subscription)));
+            }
         }
     }
 
@@ -677,8 +721,8 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     }
 
     protected void notifyPendingBatchReceivedCallBack() {
-        final OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
-        if (opBatchReceive == null || opBatchReceive.future == null) {
+        OpBatchReceive<T> opBatchReceive = pollNextBatchReceive();
+        if (opBatchReceive == null) {
             return;
         }
         try {
@@ -689,7 +733,44 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
     }
 
-    protected void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> 
opBatchReceive) {
+    private OpBatchReceive<T> peekNextBatchReceive() {
+        OpBatchReceive<T> opBatchReceive = null;
+        while (opBatchReceive == null) {
+            opBatchReceive = pendingBatchReceives.peek();
+            // no entry available
+            if (opBatchReceive == null) {
+                return null;
+            }
+            // remove entries where future is null or has been completed 
(cancel / timeout)
+            if (opBatchReceive.future == null || 
opBatchReceive.future.isDone()) {
+                OpBatchReceive<T> removed = pendingBatchReceives.poll();
+                if (removed != opBatchReceive) {
+                    log.error("Bug: Removed entry wasn't the expected one. 
expected={}, removed={}", opBatchReceive, removed);
+                }
+                opBatchReceive = null;
+            }
+        }
+        return opBatchReceive;
+    }
+
+
+    private OpBatchReceive<T> pollNextBatchReceive() {
+        OpBatchReceive<T> opBatchReceive = null;
+        while (opBatchReceive == null) {
+            opBatchReceive = pendingBatchReceives.poll();
+            // no entry available
+            if (opBatchReceive == null) {
+                return null;
+            }
+            // skip entries where future is null or has been completed (cancel 
/ timeout)
+            if (opBatchReceive.future == null || 
opBatchReceive.future.isDone()) {
+                opBatchReceive = null;
+            }
+        }
+        return opBatchReceive;
+    }
+
+    protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> 
opBatchReceive) {
         MessagesImpl<T> messages = getNewMessagesImpl();
         Message<T> msgPeeked = incomingMessages.peek();
         while (msgPeeked != null && messages.canAdd(msgPeeked)) {
@@ -701,7 +782,14 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             }
             msgPeeked = incomingMessages.peek();
         }
-        opBatchReceive.future.complete(messages);
+        completePendingBatchReceive(opBatchReceive.future, messages);
+    }
+
+    protected void completePendingBatchReceive(CompletableFuture<Messages<T>> 
future, Messages<T> messages) {
+        if (!future.complete(messages)) {
+            log.warn("Race condition detected. batch receive future was 
already completed (cancelled={}) and messages were dropped. messages={}",
+                    future.isCancelled(), messages);
+        }
     }
 
     protected abstract void messageProcessed(Message<?> msg);
@@ -722,7 +810,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             if (pendingBatchReceives == null) {
                 pendingBatchReceives = Queues.newConcurrentLinkedQueue();
             }
-            OpBatchReceive<T> firstOpBatchReceive = 
pendingBatchReceives.peek();
+            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
 
             while (firstOpBatchReceive != null) {
@@ -733,9 +821,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
                 if (diff <= 0) {
                     // The diff is less than or equal to zero, meaning that 
the batch receive has been timed out.
                     // complete the OpBatchReceive and continue to check the 
next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pendingBatchReceives.poll();
-                    completeOpBatchReceive(op);
-                    firstOpBatchReceive = pendingBatchReceives.peek();
+                    OpBatchReceive<T> op = pollNextBatchReceive();
+                    if (op != null) {
+                        completeOpBatchReceive(op);
+                    }
+                    firstOpBatchReceive = peekNextBatchReceive();
                 } else {
                     // The diff is greater than zero, set the timeout to the 
diff value
                     timeToWaitMs = diff;
@@ -752,7 +842,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     }
 
     protected boolean hasPendingBatchReceive() {
-        return pendingBatchReceives != null && !pendingBatchReceives.isEmpty();
+        return pendingBatchReceives != null && peekNextBatchReceive() != null;
     }
 
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c73d2f1..73d5bb4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -89,7 +89,6 @@ import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
@@ -109,7 +108,6 @@ import 
org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
-import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -400,14 +398,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
-
-        CompletableFuture<Message<T>> result = new CompletableFuture<>();
+        CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
+        CompletableFuture<Message<T>> result = 
cancellationHandler.createFuture();
         Message<T> message = null;
         try {
             lock.writeLock().lock();
             message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
             if (message == null) {
                 pendingReceives.add(result);
+                cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -462,7 +461,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     @Override
     protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
-        CompletableFuture<Messages<T>> result = new CompletableFuture<>();
+        CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
+        CompletableFuture<Messages<T>> result = 
cancellationHandler.createFuture();
         try {
             lock.writeLock().lock();
             if (pendingBatchReceives == null) {
@@ -482,7 +482,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
                 result.complete(messages);
             } else {
-                pendingBatchReceives.add(OpBatchReceive.of(result));
+                OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
+                pendingBatchReceives.add(opBatchReceive);
+                cancellationHandler.setCancelAction(() -> 
pendingBatchReceives.remove(opBatchReceive));
             }
         } finally {
             lock.writeLock().unlock();
@@ -1196,7 +1198,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (deadLetterPolicy != null && 
possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= 
deadLetterPolicy.getMaxRedeliverCount()) {
                     
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
 Collections.singletonList(message));
                 }
-                if (!pendingReceives.isEmpty()) {
+                if (peekPendingReceive() != null) {
                     notifyPendingReceivedCallback(message, null);
                 } else if (enqueueMessageAndCheckBatchReceive(message)) {
                     if (hasPendingBatchReceive()) {
@@ -1349,7 +1351,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         // fetch receivedCallback from queue
-        final CompletableFuture<Message<T>> receivedFuture = 
pendingReceives.poll();
+        final CompletableFuture<Message<T>> receivedFuture = 
pollPendingReceive();
         if (receivedFuture == null) {
             return;
         }
@@ -1382,7 +1384,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         // call proper interceptor
         final Message<T> interceptMessage = beforeConsume(message);
         // return message to receivedCallback
-        listenerExecutor.execute(() -> 
receivedFuture.complete(interceptMessage));
+        completePendingReceive(receivedFuture, interceptMessage);
     }
 
     void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int 
redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
@@ -1458,7 +1460,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
                 lock.readLock().lock();
                 try {
-                    if (!pendingReceives.isEmpty()) {
+                    if (peekPendingReceive() != null) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message)) {
                         if (hasPendingBatchReceive()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 41bb237..bdaa94b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -243,10 +243,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         // if asyncReceive is waiting : return message to callback without 
adding to incomingMessages queue
-        CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+        CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
         if (receivedFuture != null) {
             unAckedMessageTracker.add(topicMessage.getMessageId());
-            listenerExecutor.execute(() -> 
receivedFuture.complete(topicMessage));
+            completePendingReceive(receivedFuture, topicMessage);
         } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && 
hasPendingBatchReceive()) {
             notifyPendingBatchReceivedCallBack();
         }
@@ -352,7 +352,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
-        CompletableFuture<Messages<T>> result = new CompletableFuture<>();
+        CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
+        CompletableFuture<Messages<T>> result = 
cancellationHandler.createFuture();
         try {
             lock.writeLock().lock();
             if (pendingBatchReceives == null) {
@@ -372,7 +373,9 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 }
                 result.complete(messages);
             } else {
-                pendingBatchReceives.add(OpBatchReceive.of(result));
+                OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
+                pendingBatchReceives.add(opBatchReceive);
+                cancellationHandler.setCancelAction(() -> 
pendingBatchReceives.remove(opBatchReceive));
             }
             resumeReceivingFromPausedConsumersIfNeeded();
         } finally {
@@ -383,10 +386,12 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     protected CompletableFuture<Message<T>> internalReceiveAsync() {
-        CompletableFuture<Message<T>> result = new CompletableFuture<>();
+        CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
+        CompletableFuture<Message<T>> result = 
cancellationHandler.createFuture();
         Message<T> message = incomingMessages.poll();
         if (message == null) {
             pendingReceives.add(result);
+            cancellationHandler.setCancelAction(() -> 
pendingReceives.remove(result));
         } else {
             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, 
-message.getData().length);
             checkState(message instanceof TopicMessageImpl);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 4f172e3..79fc1c5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -132,10 +132,13 @@ public class ReaderImpl<T> implements Reader<T> {
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        return consumer.receiveAsync().thenApply(msg -> {
-            consumer.acknowledgeCumulativeAsync(msg);
-            return msg;
+        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
+        receiveFuture.whenComplete((msg, t) -> {
+           if (msg != null) {
+               consumer.acknowledgeCumulativeAsync(msg);
+           }
         });
+        return receiveFuture;
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
new file mode 100644
index 0000000..a1653c1
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoop;
+import io.netty.util.Timer;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.mockito.Mockito;
+
+import java.net.SocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.*;
+
+class ClientTestFixtures {
+    public static ScheduledExecutorService SCHEDULER = 
Executors.newSingleThreadScheduledExecutor();
+
+    static <T> PulsarClientImpl createPulsarClientMock() {
+        PulsarClientImpl clientMock = mock(PulsarClientImpl.class, 
Mockito.RETURNS_DEEP_STUBS);
+
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        when(clientMock.getConfiguration()).thenReturn(clientConf);
+        when(clientMock.timer()).thenReturn(mock(Timer.class));
+
+        
when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
+        
when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
+
+        return clientMock;
+    }
+
+    static <T> PulsarClientImpl createPulsarClientMockWithMockedClientCnx() {
+        return mockClientCnx(createPulsarClientMock());
+    }
+
+    static PulsarClientImpl mockClientCnx(PulsarClientImpl clientMock) {
+        ClientCnx clientCnxMock = mock(ClientCnx.class, 
Mockito.RETURNS_DEEP_STUBS);
+        
when(clientCnxMock.ctx()).thenReturn(mock(ChannelHandlerContext.class));
+        when(clientCnxMock.sendRequestWithId(any(), anyLong()))
+                
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
+        
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
+        
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+        return clientMock;
+    }
+
+    static <T> CompletableFuture<T> createDelayedCompletedFuture(T result, int 
delayMillis) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        SCHEDULER.schedule(() -> future.complete(result), delayMillis, 
TimeUnit.MILLISECONDS);
+        return future;
+    }
+
+    public static ExecutorService createMockedExecutor() {
+        return mock(ExecutorService.class);
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
new file mode 100644
index 0000000..9582b73
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.testng.Assert.assertTrue;
+
+public class CompletableFutureCancellationHandlerTest {
+
+    @Test
+    public void callsCancelActionWhenCancelled() {
+        // given
+        AtomicBoolean called = new AtomicBoolean();
+        CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
+        CompletableFuture<Object> future = cancellationHandler.createFuture();
+        cancellationHandler.setCancelAction(() -> called.set(true));
+        // when
+        future.cancel(false);
+        // then
+        assertTrue(called.get());
+    }
+
+    @Test
+    public void callsCancelActionWhenTimeoutHappens() {
+        // given
+        AtomicBoolean called = new AtomicBoolean();
+        CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
+        CompletableFuture<Object> future = cancellationHandler.createFuture();
+        cancellationHandler.setCancelAction(() -> called.set(true));
+        // when
+        future.completeExceptionally(new TimeoutException());
+        // then
+        assertTrue(called.get());
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index dcb2e36..75d7753 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -19,16 +19,13 @@
 package org.apache.pulsar.client.impl;
 
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import io.netty.util.Timer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
@@ -37,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -48,29 +46,24 @@ public class ConsumerImplTest {
 
 
     private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
-    private ConsumerImpl<ConsumerImpl> consumer;
+    private ConsumerImpl<byte[]> consumer;
     private ConsumerConfigurationData consumerConf;
 
     @BeforeMethod
     public void setUp() {
         consumerConf = new ConsumerConfigurationData<>();
-        ClientConfigurationData clientConf = new ClientConfigurationData();
-        PulsarClientImpl client = mock(PulsarClientImpl.class);
-        CompletableFuture<ClientCnx> clientCnxFuture = new 
CompletableFuture<>();
-        CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new 
CompletableFuture<>();
-        String topic = "non-persistent://tenant/ns1/my-topic";
-
-        // Mock connection for grabCnx()
-        when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
+        PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
+        ClientConfigurationData clientConf = client.getConfiguration();
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
-        when(client.getConfiguration()).thenReturn(clientConf);
-        when(client.timer()).thenReturn(mock(Timer.class));
+        CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new 
CompletableFuture<>();
+        String topic = "non-persistent://tenant/ns1/my-topic";
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
                 executorService, -1, false, subscribeFuture, null, null, null,
                 true);
+        consumer.setState(HandlerState.State.Ready);
     }
 
     @Test(invocationTimeOut = 1000)
@@ -88,7 +81,7 @@ public class ConsumerImplTest {
 
     @Test(invocationTimeOut = 1000)
     public void testNotifyPendingReceivedCallback_CompleteWithException() {
-        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new 
CompletableFuture<>();
+        CompletableFuture<Message<byte[]>> receiveFuture = new 
CompletableFuture<>();
         consumer.pendingReceives.add(receiveFuture);
         Exception exception = new 
PulsarClientException.InvalidMessageException("some random exception");
         consumer.notifyPendingReceivedCallback(null, exception);
@@ -105,7 +98,7 @@ public class ConsumerImplTest {
 
     @Test(invocationTimeOut = 1000)
     public void 
testNotifyPendingReceivedCallback_CompleteWithExceptionWhenMessageIsNull() {
-        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new 
CompletableFuture<>();
+        CompletableFuture<Message<byte[]>> receiveFuture = new 
CompletableFuture<>();
         consumer.pendingReceives.add(receiveFuture);
         consumer.notifyPendingReceivedCallback(null, null);
 
@@ -120,15 +113,15 @@ public class ConsumerImplTest {
 
     @Test(invocationTimeOut = 1000)
     public void 
testNotifyPendingReceivedCallback_InterceptorsWorksWithPrefetchDisabled() {
-        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new 
CompletableFuture<>();
+        CompletableFuture<Message<byte[]>> receiveFuture = new 
CompletableFuture<>();
         MessageImpl message = mock(MessageImpl.class);
-        ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+        ConsumerImpl<byte[]> spy = spy(consumer);
 
         consumer.pendingReceives.add(receiveFuture);
         consumerConf.setReceiverQueueSize(0);
         doReturn(message).when(spy).beforeConsume(any());
         spy.notifyPendingReceivedCallback(message, null);
-        Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+        Message<byte[]> receivedMessage = receiveFuture.join();
 
         verify(spy, times(1)).beforeConsume(message);
         Assert.assertTrue(receiveFuture.isDone());
@@ -138,15 +131,15 @@ public class ConsumerImplTest {
 
     @Test(invocationTimeOut = 1000)
     public void testNotifyPendingReceivedCallback_WorkNormally() {
-        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new 
CompletableFuture<>();
+        CompletableFuture<Message<byte[]>> receiveFuture = new 
CompletableFuture<>();
         MessageImpl message = mock(MessageImpl.class);
-        ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+        ConsumerImpl<byte[]> spy = spy(consumer);
 
         consumer.pendingReceives.add(receiveFuture);
         doReturn(message).when(spy).beforeConsume(any());
         doNothing().when(spy).messageProcessed(message);
         spy.notifyPendingReceivedCallback(message, null);
-        Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+        Message<byte[]> receivedMessage = receiveFuture.join();
 
         verify(spy, times(1)).beforeConsume(message);
         verify(spy, times(1)).messageProcessed(message);
@@ -154,4 +147,26 @@ public class ConsumerImplTest {
         Assert.assertFalse(receiveFuture.isCompletedExceptionally());
         Assert.assertEquals(receivedMessage, message);
     }
+
+    @Test
+    public void testReceiveAsyncCanBeCancelled() {
+        // given
+        CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+        Assert.assertEquals(consumer.peekPendingReceive(), future);
+        // when
+        future.cancel(true);
+        // then
+        Assert.assertTrue(consumer.pendingReceives.isEmpty());
+    }
+
+    @Test
+    public void testBatchReceiveAsyncCanBeCancelled() {
+        // given
+        CompletableFuture<Messages<byte[]>> future = 
consumer.batchReceiveAsync();
+        Assert.assertTrue(consumer.hasPendingBatchReceive());
+        // when
+        future.cancel(true);
+        // then
+        Assert.assertFalse(consumer.hasPendingBatchReceive());
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 5964d4f..767cb65 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -19,33 +19,30 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Sets;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
-import java.net.SocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
+import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
+import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.expectThrows;
 
@@ -87,18 +84,7 @@ public class MultiTopicsConsumerImplTest {
     @Test(timeOut = 5000)
     public void testParallelSubscribeAsync() throws Exception {
         String topicName = "parallel-subscribe-async-topic";
-        String subscriptionName = "parallel-subscribe-async-subscription";
-        String serviceUrl = "pulsar://localhost:6650";
-        Schema<byte[]> schema = Schema.BYTES;
-        ExecutorService listenerExecutor = mock(ExecutorService.class);
-        ClientConfigurationData conf = new ClientConfigurationData();
-        conf.setServiceUrl(serviceUrl);
-        ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
-        consumerConfData.setSubscriptionName(subscriptionName);
-        int completionDelayMillis = 100;
-        PulsarClientImpl client = setUpPulsarClientMock(schema, 
completionDelayMillis);
-        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<>(client, consumerConfData, listenerExecutor,
-            new CompletableFuture<>(), schema, null, true);
+        MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer();
 
         CompletableFuture<Void> firstInvocation = 
impl.subscribeAsync(topicName, true);
         Thread.sleep(5); // less than completionDelayMillis
@@ -111,36 +97,44 @@ public class MultiTopicsConsumerImplTest {
         assertTrue(cause.getMessage().endsWith("Topic is already being 
subscribed for in other thread."));
     }
 
-    private <T> PulsarClientImpl setUpPulsarClientMock(Schema<T> schema, int 
completionDelayMillis) {
-        PulsarClientImpl clientMock = mock(PulsarClientImpl.class, 
Mockito.RETURNS_DEEP_STUBS);
-
-        
when(clientMock.getConfiguration()).thenReturn(mock(ClientConfigurationData.class));
-        when(clientMock.timer()).thenReturn(mock(Timer.class));
+    private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer() {
+        ExecutorService listenerExecutor = mock(ExecutorService.class);
+        ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
+        consumerConfData.setSubscriptionName("subscriptionName");
+        int completionDelayMillis = 100;
+        Schema<byte[]> schema = Schema.BYTES;
+        PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx();
         
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> 
createDelayedCompletedFuture(
-            new PartitionedTopicMetadata(), completionDelayMillis));
-        when(clientMock.<T>preProcessSchemaBeforeSubscribe(any(), any(), 
any()))
-            .thenReturn(CompletableFuture.completedFuture(schema));
-        
when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
-        
when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
-
-        ClientCnx clientCnxMock = mock(ClientCnx.class, 
Mockito.RETURNS_DEEP_STUBS);
-        
when(clientCnxMock.ctx()).thenReturn(mock(ChannelHandlerContext.class));
-        when(clientCnxMock.sendRequestWithId(any(), anyLong()))
-            
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
-        
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
-        
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
-
-        return clientMock;
+                new PartitionedTopicMetadata(), completionDelayMillis));
+        when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(), 
any()))
+                .thenReturn(CompletableFuture.completedFuture(schema));
+        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+            new CompletableFuture<>(), schema, null, true);
+        return impl;
+    }
+
+    @Test
+    public void testReceiveAsyncCanBeCancelled() {
+        // given
+        MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
+        CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+        assertEquals(consumer.peekPendingReceive(), future);
+        // when
+        future.cancel(true);
+        // then
+        assertTrue(consumer.pendingReceives.isEmpty());
     }
 
-    private <T> CompletableFuture<T> createDelayedCompletedFuture(T result, 
int delayMillis) {
-        return CompletableFuture.supplyAsync(() -> {
-            try {
-                Thread.sleep(delayMillis);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-            return result;
-        });
+    @Test
+    public void testBatchReceiveAsyncCanBeCancelled() {
+        // given
+        MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
+        CompletableFuture<Messages<byte[]>> future = 
consumer.batchReceiveAsync();
+        assertTrue(consumer.hasPendingBatchReceive());
+        // when
+        future.cancel(true);
+        // then
+        assertFalse(consumer.hasPendingBatchReceive());
     }
+
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
new file mode 100644
index 0000000..e4d50db
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class ReaderImplTest {
+    ReaderImpl<byte[]> reader;
+
+    @BeforeMethod
+    void setupReader() {
+        PulsarClientImpl mockedClient = 
ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
+        ReaderConfigurationData<byte[]> readerConfiguration = new 
ReaderConfigurationData<>();
+        readerConfiguration.setTopicName("topicName");
+        CompletableFuture<Consumer<byte[]>> consumerFuture = new 
CompletableFuture<>();
+        reader = new ReaderImpl<>(mockedClient, readerConfiguration, 
ClientTestFixtures.createMockedExecutor(), consumerFuture, Schema.BYTES);
+    }
+
+    @Test
+    void shouldSupportCancellingReadNextAsync() {
+        // given
+        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        assertNotNull(reader.getConsumer().peekPendingReceive());
+
+        // when
+        future.cancel(false);
+
+        // then
+        assertNull(reader.getConsumer().peekPendingReceive());
+    }
+}

Reply via email to