This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 33f30737616 [improve][java-client] Only trigger the batch receive
timeout when having pending batch receives requests (#16160)
33f30737616 is described below
commit 33f3073761619ffd47def215545eaa38c41455cf
Author: lipenghui <[email protected]>
AuthorDate: Thu Jun 23 09:04:25 2022 +0800
[improve][java-client] Only trigger the batch receive timeout when having
pending batch receives requests (#16160)
The consumer will apply the default batch receive policy even if the user
will not use the batch receive API.
https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61
This will consume lots of CPU if the client have many consumers (100k
consumers)
The Pulsar perf tool can also reproduce the problem if run the test with
many consumers
If there is no pending batch receive operation for a consumer, no need to
trigger the
batch timeout task periodically. We can only start the timeout check after
adding batch
receive request to pending request queue.
Remove the lock in MultiTopicsConsumerImpl as #10352 does
Added new test to verify the batch receive timeout task will not start if
no batch
receive request
(cherry picked from commit a0ccdc96bb05d19651f3778c23b89425d516d77a)
---
.../client/api/ConsumerBatchReceiveTest.java | 47 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 21 ++++++++--
.../apache/pulsar/client/impl/ConsumerImpl.java | 1 +
.../client/impl/MultiTopicsConsumerImpl.java | 20 +++------
.../client/impl/MultiTopicsConsumerImplTest.java | 2 +-
5 files changed, 71 insertions(+), 20 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 1473f280753..19cb25664b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.api;
import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -48,6 +50,14 @@ public class ConsumerBatchReceiveTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @DataProvider(name = "partitioned")
+ public Object[][] partitionedTopicProvider() {
+ return new Object[][] {
+ { true },
+ { false }
+ };
+ }
+
@DataProvider(name = "batchReceivePolicy")
public Object[][] batchReceivePolicyProvider() {
return new Object[][] {
@@ -425,6 +435,43 @@ public class ConsumerBatchReceiveTest extends
ProducerConsumerBase {
latch.await();
}
+ @Test(dataProvider = "partitioned")
+ public void testBatchReceiveTimeoutTask(boolean partitioned) throws
Exception {
+ final String topic = "persistent://my-property/my-ns/batch-receive-" +
UUID.randomUUID();
+
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topic, 3);
+ }
+
+ @Cleanup
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .receiverQueueSize(1)
+ .batchReceivePolicy(BatchReceivePolicy.builder()
+ .maxNumBytes(1024 * 1024)
+ .maxNumMessages(1)
+ .timeout(5, TimeUnit.SECONDS)
+ .build())
+ .subscribe();
+
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+ final int messagesToSend = 500;
+ sendMessagesAsyncAndWait(producer, messagesToSend);
+ for (int i = 0; i < 100; i++) {
+ Assert.assertNotNull(consumer.receive());
+ }
+
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+ for (int i = 0; i < 400; i++) {
+ Messages<String> batchReceived = consumer.batchReceive();
+ Assert.assertEquals(batchReceived.size(), 1);
+ }
+ Awaitility.await().untilAsserted(() ->
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+ Assert.assertEquals(consumer.batchReceive().size(), 0);
+ Awaitility.await().untilAsserted(() ->
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+ }
+
private void
receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String>
consumer,
BatchReceivePolicy batchReceivePolicy,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index c53d49ad4bd..f246c550c0e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -134,9 +134,12 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
} else {
this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
}
+ }
- if (batchReceivePolicy.getTimeoutMs() > 0) {
- batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
+ protected void triggerBatchReceiveTimeoutTask() {
+ if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() >
0) {
+ batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
+ batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
}
@@ -866,6 +869,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
long timeToWaitMs;
+ boolean hasPendingReceives = false;
synchronized (this) {
// If it's closing/closed we need to ignore this timeout and not
schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
@@ -902,13 +906,18 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
} else {
// The diff is greater than zero, set the timeout to the
diff value
timeToWaitMs = diff;
+ hasPendingReceives = true;
break;
}
opBatchReceive = pendingBatchReceives.peek();
}
- batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
- timeToWaitMs, TimeUnit.MILLISECONDS);
+ if (hasPendingReceives) {
+ batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
+ timeToWaitMs, TimeUnit.MILLISECONDS);
+ } else {
+ batchReceiveTimeout = null;
+ }
}
}
@@ -1014,6 +1023,10 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
resetIncomingMessageSize();
}
+ public boolean hasBatchReceiveTimeout() {
+ return batchReceiveTimeout != null;
+ }
+
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
private ExecutorService getExternalExecutor(Message<T> msg) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1a185d4c17d..94531479236 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -500,6 +500,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} else {
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
+ triggerBatchReceiveTimeoutTask();
cancellationHandler.setCancelAction(() ->
pendingBatchReceives.remove(opBatchReceive));
}
});
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index be125596835..226e2a5055d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -68,8 +68,6 @@ import
org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -100,8 +98,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
CompletableFuture<Void> partitionsAutoUpdateFuture = null;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
private final ConsumerStatsRecorder stats;
private UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
@@ -387,8 +383,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result =
cancellationHandler.createFuture();
- try {
- lock.writeLock().lock();
+ internalPinnedExecutor.execute(() -> {
if (hasEnoughMessagesForBatchReceive()) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
@@ -405,13 +400,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
} else {
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
+ triggerBatchReceiveTimeoutTask();
cancellationHandler.setCancelAction(() ->
pendingBatchReceives.remove(opBatchReceive));
}
resumeReceivingFromPausedConsumersIfNeeded();
- } finally {
- lock.writeLock().unlock();
- }
-
+ });
return result;
}
@@ -639,17 +632,14 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public void redeliverUnacknowledgedMessages() {
- lock.writeLock().lock();
- try {
+ internalPinnedExecutor.execute(() -> {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();
- } finally {
- lock.writeLock().unlock();
- }
+ });
resumeReceivingFromPausedConsumersIfNeeded();
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 38e66807228..fe818069408 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -178,7 +178,7 @@ public class MultiTopicsConsumerImplTest {
// given
MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
CompletableFuture<Messages<byte[]>> future =
consumer.batchReceiveAsync();
- assertTrue(consumer.hasPendingBatchReceive());
+ Awaitility.await().untilAsserted(() ->
assertTrue(consumer.hasPendingBatchReceive()));
// when
future.cancel(true);
// then