This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 91385337a33 [improve][java-client] Only trigger the batch receive 
timeout when having pending batch receives requests (#16160)
91385337a33 is described below

commit 91385337a33d442b5cfb6f61318736627fd1f8bc
Author: lipenghui <[email protected]>
AuthorDate: Thu Jun 23 09:04:25 2022 +0800

    [improve][java-client] Only trigger the batch receive timeout when having 
pending batch receives requests (#16160)
    
    The consumer will apply the default batch receive policy even if the user 
will not use the batch receive API.
    
    
https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61
    
    This will consume lots of CPU if the client have many consumers (100k 
consumers)
    
    The Pulsar perf tool can also reproduce the problem if run the test with 
many consumers
    
    If there is no pending batch receive operation for a consumer, no need to 
trigger the
    batch timeout task periodically. We can only start the timeout check after 
adding batch
    receive request to pending request queue.
    
    Remove the lock in MultiTopicsConsumerImpl as #10352 does
    
    Added new test to verify the batch receive timeout task will not start if 
no batch
    receive request
    
    (cherry picked from commit a0ccdc96bb05d19651f3778c23b89425d516d77a)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 47 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 22 +++++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  1 +
 .../client/impl/MultiTopicsConsumerImpl.java       | 20 +++------
 .../client/impl/MultiTopicsConsumerImplTest.java   |  2 +-
 5 files changed, 71 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 1473f280753..19cb25664b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api;
 
 import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -48,6 +50,14 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @DataProvider(name = "partitioned")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] {
+            { true },
+            { false }
+        };
+    }
+
     @DataProvider(name = "batchReceivePolicy")
     public Object[][] batchReceivePolicyProvider() {
         return new Object[][] {
@@ -425,6 +435,43 @@ public class ConsumerBatchReceiveTest extends 
ProducerConsumerBase {
         latch.await();
     }
 
+    @Test(dataProvider = "partitioned")
+    public void testBatchReceiveTimeoutTask(boolean partitioned) throws 
Exception {
+        final String topic = "persistent://my-property/my-ns/batch-receive-" + 
UUID.randomUUID();
+
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topic, 3);
+        }
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .receiverQueueSize(1)
+                .batchReceivePolicy(BatchReceivePolicy.builder()
+                        .maxNumBytes(1024 * 1024)
+                        .maxNumMessages(1)
+                        .timeout(5, TimeUnit.SECONDS)
+                        .build())
+                .subscribe();
+        
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+        final int messagesToSend = 500;
+        sendMessagesAsyncAndWait(producer, messagesToSend);
+        for (int i = 0; i < 100; i++) {
+            Assert.assertNotNull(consumer.receive());
+        }
+        
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+        for (int i = 0; i < 400; i++) {
+            Messages<String> batchReceived = consumer.batchReceive();
+            Assert.assertEquals(batchReceived.size(), 1);
+        }
+        Awaitility.await().untilAsserted(() -> 
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+        Assert.assertEquals(consumer.batchReceive().size(), 0);
+        Awaitility.await().untilAsserted(() -> 
Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+    }
+
 
     private void 
receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> 
consumer,
                                                                             
BatchReceivePolicy batchReceivePolicy,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index c53d49ad4bd..49e9acac056 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -134,9 +134,12 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         } else {
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
+    }
 
-        if (batchReceivePolicy.getTimeoutMs() > 0) {
-            batchReceiveTimeout = 
client.timer().newTimeout(this::pendingBatchReceiveTask, 
batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
+    protected void triggerBatchReceiveTimeoutTask() {
+        if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 
0) {
+            batchReceiveTimeout = 
client.timer().newTimeout(this::pendingBatchReceiveTask,
+                    batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
         }
     }
 
@@ -865,7 +868,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
 
         long timeToWaitMs;
-
+        boolean hasPendingReceives = false;
         synchronized (this) {
             // If it's closing/closed we need to ignore this timeout and not 
schedule next timeout.
             if (getState() == State.Closing || getState() == State.Closed) {
@@ -902,13 +905,18 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
                 } else {
                     // The diff is greater than zero, set the timeout to the 
diff value
                     timeToWaitMs = diff;
+                    hasPendingReceives = true;
                     break;
                 }
 
                 opBatchReceive = pendingBatchReceives.peek();
             }
-            batchReceiveTimeout = 
client.timer().newTimeout(this::pendingBatchReceiveTask,
-                    timeToWaitMs, TimeUnit.MILLISECONDS);
+            if (hasPendingReceives) {
+                batchReceiveTimeout = 
client.timer().newTimeout(this::pendingBatchReceiveTask,
+                        timeToWaitMs, TimeUnit.MILLISECONDS);
+            } else {
+                batchReceiveTimeout = null;
+            }
         }
     }
 
@@ -1034,5 +1042,9 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         return executor;
     }
 
+    public boolean hasBatchReceiveTimeout() {
+        return batchReceiveTimeout != null;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerBase.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5381b280330..89e434d41d4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -500,6 +500,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             } else {
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
+                triggerBatchReceiveTimeoutTask();
                 cancellationHandler.setCancelAction(() -> 
pendingBatchReceives.remove(opBatchReceive));
             }
         });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index b2de0e3b92c..2dd6bb9e304 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -43,8 +43,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -101,8 +99,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     private volatile Timeout partitionsAutoUpdateTimeout = null;
     TopicsPartitionChangedListener topicsPartitionChangedListener;
     CompletableFuture<Void> partitionsAutoUpdateFuture = null;
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
     private final ConsumerStatsRecorder stats;
     private UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
@@ -388,8 +384,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new 
CompletableFutureCancellationHandler();
         CompletableFuture<Messages<T>> result = 
cancellationHandler.createFuture();
-        try {
-            lock.writeLock().lock();
+        internalPinnedExecutor.execute(() -> {
             if (hasEnoughMessagesForBatchReceive()) {
                 MessagesImpl<T> messages = getNewMessagesImpl();
                 Message<T> msgPeeked = incomingMessages.peek();
@@ -406,13 +401,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             } else {
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
+                triggerBatchReceiveTimeoutTask();
                 cancellationHandler.setCancelAction(() -> 
pendingBatchReceives.remove(opBatchReceive));
             }
             resumeReceivingFromPausedConsumersIfNeeded();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
+        });
         return result;
     }
 
@@ -640,17 +633,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        lock.writeLock().lock();
-        try {
+        internalPinnedExecutor.execute(() -> {
             consumers.values().stream().forEach(consumer -> {
                 consumer.redeliverUnacknowledgedMessages();
                 consumer.unAckedChunkedMessageIdSequenceMap.clear();
             });
             clearIncomingMessages();
             unAckedMessageTracker.clear();
-        } finally {
-            lock.writeLock().unlock();
-        }
+        });
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 38e66807228..fe818069408 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -178,7 +178,7 @@ public class MultiTopicsConsumerImplTest {
         // given
         MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
         CompletableFuture<Messages<byte[]>> future = 
consumer.batchReceiveAsync();
-        assertTrue(consumer.hasPendingBatchReceive());
+        Awaitility.await().untilAsserted(() -> 
assertTrue(consumer.hasPendingBatchReceive()));
         // when
         future.cancel(true);
         // then

Reply via email to