This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 11d4d07 [Issue 8300][Java client] Support cancelling message & batch
futures returned from Reader & Consumer (#8326)
11d4d07 is described below
commit 11d4d077f5bb009297b32e19b3eb1ace3660d0c5
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Oct 26 10:13:15 2020 +0200
[Issue 8300][Java client] Support cancelling message & batch futures
returned from Reader & Consumer (#8326)
Fixes #8300, #8307
### Motivation
Explained in #8300
### Modifications
The futures returned by
`org.apache.pulsar.client.api.Reader.readNextAsync`,
`org.apache.pulsar.client.api.Consumer.receiveAsync` and
`org.apache.pulsar.client.api.Consumer.batchReceiveAsync` methods
return a `CompletableFuture` that contains a "whenComplete" callback which
handles `CancellationException & TimeoutException` . The callback action will
remove the registered futures from the internal "pendingReceives" and
"pendingBatchReceives" futures.
The changes attempt to also against possible race conditions that could
happen when cancellation and a message get received at the exactly same moment
and the future removed from the queue has already been cancelled.
While adding unit tests, some minor refactoring was made to start adding a
ClientTestFixtures class that would make it easier to share some test mocks
that are used in the client internal unit tests. The intention of this change
was to start lowering the barrier for adding better test coverage for further
changes.
---
.../org/apache/pulsar/client/api/Consumer.java | 14 +++
.../java/org/apache/pulsar/client/api/Reader.java | 9 ++
.../impl/CompletableFutureCancellationHandler.java | 106 ++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 120 ++++++++++++++++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 22 ++--
.../client/impl/MultiTopicsConsumerImpl.java | 15 ++-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 9 +-
.../pulsar/client/impl/ClientTestFixtures.java | 78 ++++++++++++++
.../CompletableFutureCancellationHandlerTest.java | 56 ++++++++++
.../pulsar/client/impl/ConsumerImplTest.java | 59 ++++++----
.../client/impl/MultiTopicsConsumerImplTest.java | 92 ++++++++--------
.../apache/pulsar/client/impl/ReaderImplTest.java | 57 ++++++++++
12 files changed, 533 insertions(+), 104 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 801896c..4827a8e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -88,6 +88,12 @@ public interface Consumer<T> extends Closeable {
* <p>{@code receiveAsync()} should be called subsequently once returned
{@code CompletableFuture} gets complete
* with received message. Else it creates <i> backlog of receive requests
</i> in the application.
*
+ * <p>The returned future can be cancelled before completion by calling
{@code .cancel(false)}
+ * ({@link CompletableFuture#cancel(boolean)}) to remove it from the the
backlog of receive requests. Another
+ * choice for ensuring a proper clean up of the returned future is to use
the CompletableFuture.orTimeout method
+ * which is available on JDK9+. That would remove it from the backlog of
receive requests if receiving exceeds
+ * the timeout.
+ *
* @return {@link CompletableFuture}<{@link Message}> will be completed
when message is available
*/
CompletableFuture<Message<T>> receiveAsync();
@@ -129,6 +135,14 @@ public interface Consumer<T> extends Closeable {
* {@code batchReceiveAsync()} should be called subsequently once returned
{@code CompletableFuture} gets complete
* with received messages. Else it creates <i> backlog of receive requests
</i> in the application.
* </p>
+ *
+ * <p>The returned future can be cancelled before completion by calling
{@code .cancel(false)}
+ * ({@link CompletableFuture#cancel(boolean)}) to remove it from the the
backlog of receive requests. Another
+ * choice for ensuring a proper clean up of the returned future is to use
the CompletableFuture.orTimeout method
+ * which is available on JDK9+. That would remove it from the backlog of
receive requests if receiving exceeds
+ * the timeout.
+ *
+ *
* @return messages
* @since 2.4.1
* @throws PulsarClientException
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
index b7cb96a..492a1cf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -55,6 +55,15 @@ public interface Reader<T> extends Closeable {
/**
* Read asynchronously the next message in the topic.
*
+ * <p>{@code readNextAsync()} should be called subsequently once returned
{@code CompletableFuture} gets complete
+ * with received message. Else it creates <i> backlog of receive requests
</i> in the application.
+ *
+ * <p>The returned future can be cancelled before completion by calling
{@code .cancel(false)}
+ * ({@link CompletableFuture#cancel(boolean)}) to remove it from the the
backlog of receive requests. Another
+ * choice for ensuring a proper clean up of the returned future is to use
the CompletableFuture.orTimeout method
+ * which is available on JDK9+. That would remove it from the backlog of
receive requests if receiving exceeds
+ * the timeout.
+ *
* @return a future that will yield a message (when it's available) or
{@link PulsarClientException} if the reader
* is already closed.
*/
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
new file mode 100644
index 0000000..1deb387
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandler.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+/**
+ * Implements cancellation and timeout support for CompletableFutures.
+ * <p>
+ * This class ensures that the cancel action gets called once after the future
completes with
+ * either {@link CancellationException} or {@link TimeoutException}.
+ * The implementation handles possible race conditions that
+ * might happen when the future gets cancelled before the cancel action is set
to this handler.
+ * <p>
+ * For timeouts, CompletableFuture's "orTimeout" method introduced in JDK9
+ * can be used in client code.
+ * <p>
+ * Cancellation and timeout support will only be active on the future where the
+ * cancellation handler has been attached to. Cancellation won't happen if
.cancel is called on
+ * any "downstream" dependent futures. A cancellation or timeout that happens
in any "upstream"
+ * future will get handled.
+ */
+class CompletableFutureCancellationHandler {
+ private volatile boolean cancelled;
+ private volatile Runnable cancelAction;
+ private final AtomicBoolean cancelHandled = new AtomicBoolean();
+
+ /**
+ * Creates a new {@link CompletableFuture} and attaches the cancellation
handler
+ * to handle cancels and timeouts.
+ *
+ * @param <T> the result type of the future
+ * @return a new future instance
+ */
+ public <T> CompletableFuture<T> createFuture() {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ attachToFuture(future);
+ return future;
+ }
+
+ /**
+ * Attaches the cancellation handler to handle cancels
+ * and timeouts
+ *
+ * @param future the future to attach the handler to
+ * @param <T> the result type of the future
+ */
+ public <T> void attachToFuture(CompletableFuture<T> future) {
+ future.whenComplete(whenCompleteFunction());
+ }
+
+ /**
+ * Set the action to run when the future gets cancelled or timeouts.
+ * The cancellation or timeout might be originating from any "upstream"
future.
+ * The implementation ensures that the cancel action gets called once.
+ * Handles possible race conditions that might happen when the future gets
cancelled
+ * before the cancel action is set to this handler. In this case, the
+ * cancel action gets called when the action is set.
+ *
+ * @param cancelAction the action to run when the the future gets
cancelled or timeouts
+ */
+ public void setCancelAction(Runnable cancelAction) {
+ if (this.cancelAction != null) {
+ throw new IllegalStateException("cancelAction can only be set
once.");
+ }
+ this.cancelAction = Objects.requireNonNull(cancelAction);
+ // handle race condition in the case that the future was already
cancelled when the handler is set
+ runCancelActionOnceIfCancelled();
+ }
+
+ private <T> BiConsumer<? super T, ? super Throwable>
whenCompleteFunction() {
+ return (T t, Throwable throwable) -> {
+ if (throwable instanceof CancellationException || throwable
instanceof TimeoutException) {
+ cancelled = true;
+ }
+ runCancelActionOnceIfCancelled();
+ };
+ }
+
+ private void runCancelActionOnceIfCancelled() {
+ if (cancelled && cancelAction != null &&
cancelHandled.compareAndSet(false, true)) {
+ cancelAction.run();
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 8c67f2c..1c40449 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -199,15 +199,57 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
}
}
+ protected CompletableFuture<Message<T>> peekPendingReceive() {
+ CompletableFuture<Message<T>> receivedFuture = null;
+ while (receivedFuture == null) {
+ receivedFuture = pendingReceives.peek();
+ if (receivedFuture == null) {
+ break;
+ }
+ // skip done futures (cancelling a future could mark it done)
+ if (receivedFuture.isDone()) {
+ CompletableFuture<Message<T>> removed = pendingReceives.poll();
+ if (removed != receivedFuture) {
+ log.error("Bug! Removed future wasn't the expected one.
expected={} removed={}", receivedFuture, removed);
+ }
+ receivedFuture = null;
+ }
+ }
+ return receivedFuture;
+ }
+
+ protected CompletableFuture<Message<T>> pollPendingReceive() {
+ CompletableFuture<Message<T>> receivedFuture;
+ while (true) {
+ receivedFuture = pendingReceives.poll();
+ // skip done futures (cancelling a future could mark it done)
+ if (receivedFuture == null || !receivedFuture.isDone()) {
+ break;
+ }
+ }
+ return receivedFuture;
+ }
+
+ protected void completePendingReceive(CompletableFuture<Message<T>>
receivedFuture, Message<T> message) {
+ listenerExecutor.execute(() -> {
+ if (!receivedFuture.complete(message)) {
+ log.warn("Race condition detected. receive future was already
completed (cancelled={}) and message was dropped. message={}",
+ receivedFuture.isCancelled(), message);
+ }
+ });
+ }
+
protected void
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives) {
while (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receiveFuture =
pendingReceives.poll();
if (receiveFuture == null) {
break;
}
- receiveFuture.completeExceptionally(
- new
PulsarClientException.AlreadyClosedException(String.format("The consumer which
subscribes the topic %s with subscription name %s " +
- "was already closed when cleaning and closing the
consumers", topic, subscription)));
+ if (!receiveFuture.isDone()) {
+ receiveFuture.completeExceptionally(
+ new
PulsarClientException.AlreadyClosedException(String.format("The consumer which
subscribes the topic %s with subscription name %s " +
+ "was already closed when cleaning and closing
the consumers", topic, subscription)));
+ }
}
}
@@ -217,9 +259,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
if (opBatchReceive == null || opBatchReceive.future == null) {
break;
}
- opBatchReceive.future.completeExceptionally(
- new
PulsarClientException.AlreadyClosedException(String.format("The consumer which
subscribes the topic %s with subscription name %s " +
- "was already closed when cleaning and closing the
consumers", topic, subscription)));
+ if (!opBatchReceive.future.isDone()) {
+ opBatchReceive.future.completeExceptionally(
+ new
PulsarClientException.AlreadyClosedException(String.format("The consumer which
subscribes the topic %s with subscription name %s " +
+ "was already closed when cleaning and closing
the consumers", topic, subscription)));
+ }
}
}
@@ -677,8 +721,8 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
protected void notifyPendingBatchReceivedCallBack() {
- final OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
- if (opBatchReceive == null || opBatchReceive.future == null) {
+ OpBatchReceive<T> opBatchReceive = pollNextBatchReceive();
+ if (opBatchReceive == null) {
return;
}
try {
@@ -689,7 +733,44 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
}
- protected void notifyPendingBatchReceivedCallBack(OpBatchReceive<T>
opBatchReceive) {
+ private OpBatchReceive<T> peekNextBatchReceive() {
+ OpBatchReceive<T> opBatchReceive = null;
+ while (opBatchReceive == null) {
+ opBatchReceive = pendingBatchReceives.peek();
+ // no entry available
+ if (opBatchReceive == null) {
+ return null;
+ }
+ // remove entries where future is null or has been completed
(cancel / timeout)
+ if (opBatchReceive.future == null ||
opBatchReceive.future.isDone()) {
+ OpBatchReceive<T> removed = pendingBatchReceives.poll();
+ if (removed != opBatchReceive) {
+ log.error("Bug: Removed entry wasn't the expected one.
expected={}, removed={}", opBatchReceive, removed);
+ }
+ opBatchReceive = null;
+ }
+ }
+ return opBatchReceive;
+ }
+
+
+ private OpBatchReceive<T> pollNextBatchReceive() {
+ OpBatchReceive<T> opBatchReceive = null;
+ while (opBatchReceive == null) {
+ opBatchReceive = pendingBatchReceives.poll();
+ // no entry available
+ if (opBatchReceive == null) {
+ return null;
+ }
+ // skip entries where future is null or has been completed (cancel
/ timeout)
+ if (opBatchReceive.future == null ||
opBatchReceive.future.isDone()) {
+ opBatchReceive = null;
+ }
+ }
+ return opBatchReceive;
+ }
+
+ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T>
opBatchReceive) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
@@ -701,7 +782,14 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
msgPeeked = incomingMessages.peek();
}
- opBatchReceive.future.complete(messages);
+ completePendingBatchReceive(opBatchReceive.future, messages);
+ }
+
+ protected void completePendingBatchReceive(CompletableFuture<Messages<T>>
future, Messages<T> messages) {
+ if (!future.complete(messages)) {
+ log.warn("Race condition detected. batch receive future was
already completed (cancelled={}) and messages were dropped. messages={}",
+ future.isCancelled(), messages);
+ }
}
protected abstract void messageProcessed(Message<?> msg);
@@ -722,7 +810,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
- OpBatchReceive<T> firstOpBatchReceive =
pendingBatchReceives.peek();
+ OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
timeToWaitMs = batchReceivePolicy.getTimeoutMs();
while (firstOpBatchReceive != null) {
@@ -733,9 +821,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
if (diff <= 0) {
// The diff is less than or equal to zero, meaning that
the batch receive has been timed out.
// complete the OpBatchReceive and continue to check the
next OpBatchReceive in pendingBatchReceives.
- OpBatchReceive<T> op = pendingBatchReceives.poll();
- completeOpBatchReceive(op);
- firstOpBatchReceive = pendingBatchReceives.peek();
+ OpBatchReceive<T> op = pollNextBatchReceive();
+ if (op != null) {
+ completeOpBatchReceive(op);
+ }
+ firstOpBatchReceive = peekNextBatchReceive();
} else {
// The diff is greater than zero, set the timeout to the
diff value
timeToWaitMs = diff;
@@ -752,7 +842,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
protected boolean hasPendingBatchReceive() {
- return pendingBatchReceives != null && !pendingBatchReceives.isEmpty();
+ return pendingBatchReceives != null && peekNextBatchReceive() != null;
}
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c73d2f1..73d5bb4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -89,7 +89,6 @@ import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAckResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
@@ -109,7 +108,6 @@ import
org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
-import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -400,14 +398,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
-
- CompletableFuture<Message<T>> result = new CompletableFuture<>();
+ CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
+ CompletableFuture<Message<T>> result =
cancellationHandler.createFuture();
Message<T> message = null;
try {
lock.writeLock().lock();
message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
if (message == null) {
pendingReceives.add(result);
+ cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -462,7 +461,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
@Override
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
- CompletableFuture<Messages<T>> result = new CompletableFuture<>();
+ CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
+ CompletableFuture<Messages<T>> result =
cancellationHandler.createFuture();
try {
lock.writeLock().lock();
if (pendingBatchReceives == null) {
@@ -482,7 +482,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
result.complete(messages);
} else {
- pendingBatchReceives.add(OpBatchReceive.of(result));
+ OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
+ pendingBatchReceives.add(opBatchReceive);
+ cancellationHandler.setCancelAction(() ->
pendingBatchReceives.remove(opBatchReceive));
}
} finally {
lock.writeLock().unlock();
@@ -1196,7 +1198,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (deadLetterPolicy != null &&
possibleSendToDeadLetterTopicMessages != null && redeliveryCount >=
deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
Collections.singletonList(message));
}
- if (!pendingReceives.isEmpty()) {
+ if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message)) {
if (hasPendingBatchReceive()) {
@@ -1349,7 +1351,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
// fetch receivedCallback from queue
- final CompletableFuture<Message<T>> receivedFuture =
pendingReceives.poll();
+ final CompletableFuture<Message<T>> receivedFuture =
pollPendingReceive();
if (receivedFuture == null) {
return;
}
@@ -1382,7 +1384,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// call proper interceptor
final Message<T> interceptMessage = beforeConsume(message);
// return message to receivedCallback
- listenerExecutor.execute(() ->
receivedFuture.complete(interceptMessage));
+ completePendingReceive(receivedFuture, interceptMessage);
}
void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int
redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
@@ -1458,7 +1460,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
lock.readLock().lock();
try {
- if (!pendingReceives.isEmpty()) {
+ if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message)) {
if (hasPendingBatchReceive()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 41bb237..bdaa94b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -243,10 +243,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
// if asyncReceive is waiting : return message to callback without
adding to incomingMessages queue
- CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+ CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId());
- listenerExecutor.execute(() ->
receivedFuture.complete(topicMessage));
+ completePendingReceive(receivedFuture, topicMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) &&
hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
@@ -352,7 +352,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
- CompletableFuture<Messages<T>> result = new CompletableFuture<>();
+ CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
+ CompletableFuture<Messages<T>> result =
cancellationHandler.createFuture();
try {
lock.writeLock().lock();
if (pendingBatchReceives == null) {
@@ -372,7 +373,9 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
result.complete(messages);
} else {
- pendingBatchReceives.add(OpBatchReceive.of(result));
+ OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
+ pendingBatchReceives.add(opBatchReceive);
+ cancellationHandler.setCancelAction(() ->
pendingBatchReceives.remove(opBatchReceive));
}
resumeReceivingFromPausedConsumersIfNeeded();
} finally {
@@ -383,10 +386,12 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
- CompletableFuture<Message<T>> result = new CompletableFuture<>();
+ CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
+ CompletableFuture<Message<T>> result =
cancellationHandler.createFuture();
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
+ cancellationHandler.setCancelAction(() ->
pendingReceives.remove(result));
} else {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
-message.getData().length);
checkState(message instanceof TopicMessageImpl);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 4f172e3..79fc1c5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -132,10 +132,13 @@ public class ReaderImpl<T> implements Reader<T> {
@Override
public CompletableFuture<Message<T>> readNextAsync() {
- return consumer.receiveAsync().thenApply(msg -> {
- consumer.acknowledgeCumulativeAsync(msg);
- return msg;
+ CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
+ receiveFuture.whenComplete((msg, t) -> {
+ if (msg != null) {
+ consumer.acknowledgeCumulativeAsync(msg);
+ }
});
+ return receiveFuture;
}
@Override
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
new file mode 100644
index 0000000..a1653c1
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoop;
+import io.netty.util.Timer;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.mockito.Mockito;
+
+import java.net.SocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.*;
+
+class ClientTestFixtures {
+ public static ScheduledExecutorService SCHEDULER =
Executors.newSingleThreadScheduledExecutor();
+
+ static <T> PulsarClientImpl createPulsarClientMock() {
+ PulsarClientImpl clientMock = mock(PulsarClientImpl.class,
Mockito.RETURNS_DEEP_STUBS);
+
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ when(clientMock.getConfiguration()).thenReturn(clientConf);
+ when(clientMock.timer()).thenReturn(mock(Timer.class));
+
+
when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
+
when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
+
+ return clientMock;
+ }
+
+ static <T> PulsarClientImpl createPulsarClientMockWithMockedClientCnx() {
+ return mockClientCnx(createPulsarClientMock());
+ }
+
+ static PulsarClientImpl mockClientCnx(PulsarClientImpl clientMock) {
+ ClientCnx clientCnxMock = mock(ClientCnx.class,
Mockito.RETURNS_DEEP_STUBS);
+
when(clientCnxMock.ctx()).thenReturn(mock(ChannelHandlerContext.class));
+ when(clientCnxMock.sendRequestWithId(any(), anyLong()))
+
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
+
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
+
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
+ return clientMock;
+ }
+
+ static <T> CompletableFuture<T> createDelayedCompletedFuture(T result, int
delayMillis) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ SCHEDULER.schedule(() -> future.complete(result), delayMillis,
TimeUnit.MILLISECONDS);
+ return future;
+ }
+
+ public static ExecutorService createMockedExecutor() {
+ return mock(ExecutorService.class);
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
new file mode 100644
index 0000000..9582b73
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/CompletableFutureCancellationHandlerTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.testng.Assert.assertTrue;
+
+public class CompletableFutureCancellationHandlerTest {
+
+ @Test
+ public void callsCancelActionWhenCancelled() {
+ // given
+ AtomicBoolean called = new AtomicBoolean();
+ CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
+ CompletableFuture<Object> future = cancellationHandler.createFuture();
+ cancellationHandler.setCancelAction(() -> called.set(true));
+ // when
+ future.cancel(false);
+ // then
+ assertTrue(called.get());
+ }
+
+ @Test
+ public void callsCancelActionWhenTimeoutHappens() {
+ // given
+ AtomicBoolean called = new AtomicBoolean();
+ CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
+ CompletableFuture<Object> future = cancellationHandler.createFuture();
+ cancellationHandler.setCancelAction(() -> called.set(true));
+ // when
+ future.completeExceptionally(new TimeoutException());
+ // then
+ assertTrue(called.get());
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index dcb2e36..75d7753 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -19,16 +19,13 @@
package org.apache.pulsar.client.impl;
import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
@@ -37,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -48,29 +46,24 @@ public class ConsumerImplTest {
private final ExecutorService executorService =
Executors.newSingleThreadExecutor();
- private ConsumerImpl<ConsumerImpl> consumer;
+ private ConsumerImpl<byte[]> consumer;
private ConsumerConfigurationData consumerConf;
@BeforeMethod
public void setUp() {
consumerConf = new ConsumerConfigurationData<>();
- ClientConfigurationData clientConf = new ClientConfigurationData();
- PulsarClientImpl client = mock(PulsarClientImpl.class);
- CompletableFuture<ClientCnx> clientCnxFuture = new
CompletableFuture<>();
- CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new
CompletableFuture<>();
- String topic = "non-persistent://tenant/ns1/my-topic";
-
- // Mock connection for grabCnx()
- when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
+ PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
+ ClientConfigurationData clientConf = client.getConfiguration();
clientConf.setOperationTimeoutMs(100);
clientConf.setStatsIntervalSeconds(0);
- when(client.getConfiguration()).thenReturn(clientConf);
- when(client.timer()).thenReturn(mock(Timer.class));
+ CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new
CompletableFuture<>();
+ String topic = "non-persistent://tenant/ns1/my-topic";
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
executorService, -1, false, subscribeFuture, null, null, null,
true);
+ consumer.setState(HandlerState.State.Ready);
}
@Test(invocationTimeOut = 1000)
@@ -88,7 +81,7 @@ public class ConsumerImplTest {
@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_CompleteWithException() {
- CompletableFuture<Message<ConsumerImpl>> receiveFuture = new
CompletableFuture<>();
+ CompletableFuture<Message<byte[]>> receiveFuture = new
CompletableFuture<>();
consumer.pendingReceives.add(receiveFuture);
Exception exception = new
PulsarClientException.InvalidMessageException("some random exception");
consumer.notifyPendingReceivedCallback(null, exception);
@@ -105,7 +98,7 @@ public class ConsumerImplTest {
@Test(invocationTimeOut = 1000)
public void
testNotifyPendingReceivedCallback_CompleteWithExceptionWhenMessageIsNull() {
- CompletableFuture<Message<ConsumerImpl>> receiveFuture = new
CompletableFuture<>();
+ CompletableFuture<Message<byte[]>> receiveFuture = new
CompletableFuture<>();
consumer.pendingReceives.add(receiveFuture);
consumer.notifyPendingReceivedCallback(null, null);
@@ -120,15 +113,15 @@ public class ConsumerImplTest {
@Test(invocationTimeOut = 1000)
public void
testNotifyPendingReceivedCallback_InterceptorsWorksWithPrefetchDisabled() {
- CompletableFuture<Message<ConsumerImpl>> receiveFuture = new
CompletableFuture<>();
+ CompletableFuture<Message<byte[]>> receiveFuture = new
CompletableFuture<>();
MessageImpl message = mock(MessageImpl.class);
- ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+ ConsumerImpl<byte[]> spy = spy(consumer);
consumer.pendingReceives.add(receiveFuture);
consumerConf.setReceiverQueueSize(0);
doReturn(message).when(spy).beforeConsume(any());
spy.notifyPendingReceivedCallback(message, null);
- Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+ Message<byte[]> receivedMessage = receiveFuture.join();
verify(spy, times(1)).beforeConsume(message);
Assert.assertTrue(receiveFuture.isDone());
@@ -138,15 +131,15 @@ public class ConsumerImplTest {
@Test(invocationTimeOut = 1000)
public void testNotifyPendingReceivedCallback_WorkNormally() {
- CompletableFuture<Message<ConsumerImpl>> receiveFuture = new
CompletableFuture<>();
+ CompletableFuture<Message<byte[]>> receiveFuture = new
CompletableFuture<>();
MessageImpl message = mock(MessageImpl.class);
- ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+ ConsumerImpl<byte[]> spy = spy(consumer);
consumer.pendingReceives.add(receiveFuture);
doReturn(message).when(spy).beforeConsume(any());
doNothing().when(spy).messageProcessed(message);
spy.notifyPendingReceivedCallback(message, null);
- Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+ Message<byte[]> receivedMessage = receiveFuture.join();
verify(spy, times(1)).beforeConsume(message);
verify(spy, times(1)).messageProcessed(message);
@@ -154,4 +147,26 @@ public class ConsumerImplTest {
Assert.assertFalse(receiveFuture.isCompletedExceptionally());
Assert.assertEquals(receivedMessage, message);
}
+
+ @Test
+ public void testReceiveAsyncCanBeCancelled() {
+ // given
+ CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+ Assert.assertEquals(consumer.peekPendingReceive(), future);
+ // when
+ future.cancel(true);
+ // then
+ Assert.assertTrue(consumer.pendingReceives.isEmpty());
+ }
+
+ @Test
+ public void testBatchReceiveAsyncCanBeCancelled() {
+ // given
+ CompletableFuture<Messages<byte[]>> future =
consumer.batchReceiveAsync();
+ Assert.assertTrue(consumer.hasPendingBatchReceive());
+ // when
+ future.cancel(true);
+ // then
+ Assert.assertFalse(consumer.hasPendingBatchReceive());
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 5964d4f..767cb65 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -19,33 +19,30 @@
package org.apache.pulsar.client.impl;
import com.google.common.collect.Sets;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
-import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.mockito.Mockito;
import org.testng.annotations.Test;
-import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import static
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
+import static
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
@@ -87,18 +84,7 @@ public class MultiTopicsConsumerImplTest {
@Test(timeOut = 5000)
public void testParallelSubscribeAsync() throws Exception {
String topicName = "parallel-subscribe-async-topic";
- String subscriptionName = "parallel-subscribe-async-subscription";
- String serviceUrl = "pulsar://localhost:6650";
- Schema<byte[]> schema = Schema.BYTES;
- ExecutorService listenerExecutor = mock(ExecutorService.class);
- ClientConfigurationData conf = new ClientConfigurationData();
- conf.setServiceUrl(serviceUrl);
- ConsumerConfigurationData<byte[]> consumerConfData = new
ConsumerConfigurationData<>();
- consumerConfData.setSubscriptionName(subscriptionName);
- int completionDelayMillis = 100;
- PulsarClientImpl client = setUpPulsarClientMock(schema,
completionDelayMillis);
- MultiTopicsConsumerImpl<byte[]> impl = new
MultiTopicsConsumerImpl<>(client, consumerConfData, listenerExecutor,
- new CompletableFuture<>(), schema, null, true);
+ MultiTopicsConsumerImpl<byte[]> impl = createMultiTopicsConsumer();
CompletableFuture<Void> firstInvocation =
impl.subscribeAsync(topicName, true);
Thread.sleep(5); // less than completionDelayMillis
@@ -111,36 +97,44 @@ public class MultiTopicsConsumerImplTest {
assertTrue(cause.getMessage().endsWith("Topic is already being
subscribed for in other thread."));
}
- private <T> PulsarClientImpl setUpPulsarClientMock(Schema<T> schema, int
completionDelayMillis) {
- PulsarClientImpl clientMock = mock(PulsarClientImpl.class,
Mockito.RETURNS_DEEP_STUBS);
-
-
when(clientMock.getConfiguration()).thenReturn(mock(ClientConfigurationData.class));
- when(clientMock.timer()).thenReturn(mock(Timer.class));
+ private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer() {
+ ExecutorService listenerExecutor = mock(ExecutorService.class);
+ ConsumerConfigurationData<byte[]> consumerConfData = new
ConsumerConfigurationData<>();
+ consumerConfData.setSubscriptionName("subscriptionName");
+ int completionDelayMillis = 100;
+ Schema<byte[]> schema = Schema.BYTES;
+ PulsarClientImpl clientMock =
createPulsarClientMockWithMockedClientCnx();
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation ->
createDelayedCompletedFuture(
- new PartitionedTopicMetadata(), completionDelayMillis));
- when(clientMock.<T>preProcessSchemaBeforeSubscribe(any(), any(),
any()))
- .thenReturn(CompletableFuture.completedFuture(schema));
-
when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
-
when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
-
- ClientCnx clientCnxMock = mock(ClientCnx.class,
Mockito.RETURNS_DEEP_STUBS);
-
when(clientCnxMock.ctx()).thenReturn(mock(ChannelHandlerContext.class));
- when(clientCnxMock.sendRequestWithId(any(), anyLong()))
-
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
-
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
-
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
-
- return clientMock;
+ new PartitionedTopicMetadata(), completionDelayMillis));
+ when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(),
any()))
+ .thenReturn(CompletableFuture.completedFuture(schema));
+ MultiTopicsConsumerImpl<byte[]> impl = new
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+ new CompletableFuture<>(), schema, null, true);
+ return impl;
+ }
+
+ @Test
+ public void testReceiveAsyncCanBeCancelled() {
+ // given
+ MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
+ CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+ assertEquals(consumer.peekPendingReceive(), future);
+ // when
+ future.cancel(true);
+ // then
+ assertTrue(consumer.pendingReceives.isEmpty());
}
- private <T> CompletableFuture<T> createDelayedCompletedFuture(T result,
int delayMillis) {
- return CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(delayMillis);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return result;
- });
+ @Test
+ public void testBatchReceiveAsyncCanBeCancelled() {
+ // given
+ MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
+ CompletableFuture<Messages<byte[]>> future =
consumer.batchReceiveAsync();
+ assertTrue(consumer.hasPendingBatchReceive());
+ // when
+ future.cancel(true);
+ // then
+ assertFalse(consumer.hasPendingBatchReceive());
}
+
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
new file mode 100644
index 0000000..e4d50db
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class ReaderImplTest {
+ ReaderImpl<byte[]> reader;
+
+ @BeforeMethod
+ void setupReader() {
+ PulsarClientImpl mockedClient =
ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
+ ReaderConfigurationData<byte[]> readerConfiguration = new
ReaderConfigurationData<>();
+ readerConfiguration.setTopicName("topicName");
+ CompletableFuture<Consumer<byte[]>> consumerFuture = new
CompletableFuture<>();
+ reader = new ReaderImpl<>(mockedClient, readerConfiguration,
ClientTestFixtures.createMockedExecutor(), consumerFuture, Schema.BYTES);
+ }
+
+ @Test
+ void shouldSupportCancellingReadNextAsync() {
+ // given
+ CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+ assertNotNull(reader.getConsumer().peekPendingReceive());
+
+ // when
+ future.cancel(false);
+
+ // then
+ assertNull(reader.getConsumer().peekPendingReceive());
+ }
+}