This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a0ccdc96bb0 [improve][java-client] Only trigger the batch receive
timeout when having pending batch receives requests (#16160)
a0ccdc96bb0 is described below
commit a0ccdc96bb05d19651f3778c23b89425d516d77a
Author: lipenghui <[email protected]>
AuthorDate: Thu Jun 23 09:04:25 2022 +0800
[improve][java-client] Only trigger the batch receive timeout when having
pending batch receives requests (#16160)
### Motivation
The consumer will apply the default batch receive policy even if the user
will not use the batch receive API.
https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61
This will consume lots of CPU if the client have many consumers (100k
consumers)
The Pulsar perf tool can also reproduce the problem if run the test with
many consumers
### Modification
If there is no pending batch receive operation for a consumer, no need to
trigger the
batch timeout task periodically. We can only start the timeout check after
adding batch
receive request to pending request queue.
Remove the lock in MultiTopicsConsumerImpl as #10352 does
### Verification
Added new test to verify the batch receive timeout task will not start if
no batch
receive request
---
.../client/api/ConsumerBatchReceiveTest.java | 47 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 23 ++++++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 1 +
.../client/impl/MultiTopicsConsumerImpl.java | 20 +++------
.../client/impl/MultiTopicsConsumerImplTest.java | 2 +-
5 files changed, 71 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 8f3d6423afb..8109e8ce8eb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.client.api;
import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -48,6 +50,14 @@ public class ConsumerBatchReceiveTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @DataProvider(name = "partitioned")
+ public Object[][] partitionedTopicProvider() {
+ return new Object[][] {
+ { true },
+ { false }
+ };
+ }
+
@DataProvider(name = "batchReceivePolicy")
public Object[][] batchReceivePolicyProvider() {
return new Object[][] {
@@ -425,6 +435,43 @@ public class ConsumerBatchReceiveTest extends
ProducerConsumerBase {
latch.await();
}
+ @Test(dataProvider = "partitioned")
+ public void testBatchReceiveTimeoutTask(boolean partitioned) throws
Exception {
+ final String topic = "persistent://my-property/my-ns/batch-receive-" +
UUID.randomUUID();
+
+ if (partitioned) {
+ admin.topics().createPartitionedTopic(topic, 3);
+ }
+
+ @Cleanup
+ Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .receiverQueueSize(1)
+ .batchReceivePolicy(BatchReceivePolicy.builder()
+ .maxNumBytes(1024 * 1024)
+ .maxNumMessages(1)
+ .timeout(5, TimeUnit.SECONDS)
+ .build())
+ .subscribe();
+
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+ final int messagesToSend = 500;
+ sendMessagesAsyncAndWait(producer, messagesToSend);
+ for (int i = 0; i < 100; i++) {
+ Assert.assertNotNull(consumer.receive());
+ }
+
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+ for (int i = 0; i < 400; i++) {
+ Messages<String> batchReceived = consumer.batchReceive();
+ Assert.assertEquals(batchReceived.size(), 1);
+ }
+ Awaitility.await().untilAsserted(() ->
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+ Assert.assertEquals(consumer.batchReceive().size(), 0);
+ Awaitility.await().untilAsserted(() ->
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+ }
+
private void
receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String>
consumer,
BatchReceivePolicy batchReceivePolicy,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index c100a8e571d..a129091d609 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -164,12 +164,14 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
}
- if (batchReceivePolicy.getTimeoutMs() > 0) {
+ initReceiverQueueSize();
+ }
+
+ protected void triggerBatchReceiveTimeoutTask() {
+ if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() >
0) {
batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
-
- initReceiverQueueSize();
}
public void initReceiverQueueSize() {
@@ -956,7 +958,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
long timeToWaitMs;
-
+ boolean hasPendingReceives = false;
synchronized (this) {
// If it's closing/closed we need to ignore this timeout and not
schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
@@ -993,13 +995,18 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
} else {
// The diff is greater than zero, set the timeout to the
diff value
timeToWaitMs = diff;
+ hasPendingReceives = true;
break;
}
opBatchReceive = pendingBatchReceives.peek();
}
- batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
- timeToWaitMs, TimeUnit.MILLISECONDS);
+ if (hasPendingReceives) {
+ batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
+ timeToWaitMs, TimeUnit.MILLISECONDS);
+ } else {
+ batchReceiveTimeout = null;
+ }
}
}
@@ -1164,5 +1171,9 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
return true;
}
+ public boolean hasBatchReceiveTimeout() {
+ return batchReceiveTimeout != null;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ConsumerBase.class);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3e25ba0facb..ffd2f68d760 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -555,6 +555,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
expectMoreIncomingMessages();
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
+ triggerBatchReceiveTimeoutTask();
cancellationHandler.setCancelAction(() ->
pendingBatchReceives.remove(opBatchReceive));
}
});
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 28d1f6cc7a3..32bdb44ac8c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -44,8 +44,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -94,8 +92,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
CompletableFuture<Void> partitionsAutoUpdateFuture = null;
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
private final ConsumerStatsRecorder stats;
private UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
@@ -423,8 +419,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new
CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result =
cancellationHandler.createFuture();
- try {
- lock.writeLock().lock();
+ internalPinnedExecutor.execute(() -> {
if (hasEnoughMessagesForBatchReceive()) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
@@ -446,13 +441,11 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
expectMoreIncomingMessages();
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
+ triggerBatchReceiveTimeoutTask();
cancellationHandler.setCancelAction(() ->
pendingBatchReceives.remove(opBatchReceive));
}
resumeReceivingFromPausedConsumersIfNeeded();
- } finally {
- lock.writeLock().unlock();
- }
-
+ });
return result;
}
@@ -695,8 +688,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
@Override
public void redeliverUnacknowledgedMessages() {
- lock.writeLock().lock();
- try {
+ internalPinnedExecutor.execute(() -> {
CONSUMER_EPOCH.incrementAndGet(this);
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
@@ -704,9 +696,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
clearIncomingMessages();
unAckedMessageTracker.clear();
- } finally {
- lock.writeLock().unlock();
- }
+ });
resumeReceivingFromPausedConsumersIfNeeded();
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 5ec56ecfffc..51fba75cd21 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -178,7 +178,7 @@ public class MultiTopicsConsumerImplTest {
// given
MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
CompletableFuture<Messages<byte[]>> future =
consumer.batchReceiveAsync();
- assertTrue(consumer.hasPendingBatchReceive());
+ Awaitility.await().untilAsserted(() ->
assertTrue(consumer.hasPendingBatchReceive()));
// when
future.cancel(true);
// then