This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new be7057a1fd8 [fix][test]: fix flaky test of 
ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
be7057a1fd8 is described below

commit be7057a1fd878111e85ec112bac8f0b72350e744
Author: wuxuanqicn <[email protected]>
AuthorDate: Thu Apr 28 11:57:39 2022 +0800

    [fix][test]: fix flaky test of 
ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
    
    Fixes #9919
    
    ### Motivation
    
    we need make sure broker executed all ack command and updated metrics, then 
we can generate and check metric
    
    ### Modifications
    
    - enable AckReceipt
    - await until ack procedure complete(ACK and ACK_RESPONSE command)
    
    Co-authored-by: xuanqi.wu <[email protected]>
---
 .../broker/stats/ManagedCursorMetricsTest.java     | 26 +++++++++++++++++-----
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  1 +
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 5e20c09fed1..4648ae2fb8f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,22 +18,27 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarTestClient;
 import org.apache.pulsar.common.stats.Metrics;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 @Test(groups = "broker")
 public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
 
@@ -49,6 +54,11 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
+    @Override
+    protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) 
throws PulsarClientException {
+        return PulsarTestClient.create(clientBuilder);
+    }
+
     @Test
     public void testManagedCursorMetrics() throws Exception {
         final String subName = "my-sub";
@@ -63,14 +73,18 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         metricsList = metrics.generate();
         Assert.assertTrue(metricsList.isEmpty());
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+        PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
this.pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .ackTimeout(1, TimeUnit.SECONDS)
                 .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
                 .subscribe();
 
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        @Cleanup
+        Producer<byte[]> producer = this.pulsarClient.newProducer()
                 .topic(topicName)
                 .create();
 
@@ -83,6 +97,8 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
             producer.send(message.getBytes());
             consumer.acknowledge(consumer.receive().getMessageId());
         }
+
+        Awaitility.await().until(() -> 
pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
         metricsList = metrics.generate();
         Assert.assertFalse(metricsList.isEmpty());
         
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"),
 0L);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6c23d668236..1edf2e1b3a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -105,6 +105,7 @@ public class ClientCnx extends PulsarHandler {
     protected final Authentication authentication;
     private State state;
 
+    @Getter
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends 
Object>> pendingRequests =
             ConcurrentLongHashMap.<TimedCompletableFuture<? extends 
Object>>newBuilder()
                     .expectedItems(16)

Reply via email to