This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new be7057a1fd8 [fix][test]: fix flaky test of
ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
be7057a1fd8 is described below
commit be7057a1fd878111e85ec112bac8f0b72350e744
Author: wuxuanqicn <[email protected]>
AuthorDate: Thu Apr 28 11:57:39 2022 +0800
[fix][test]: fix flaky test of
ManagedCursorMetricsTest.testManagedCursorMetrics (#9919) (#14720)
Fixes #9919
### Motivation
we need make sure broker executed all ack command and updated metrics, then
we can generate and check metric
### Modifications
- enable AckReceipt
- await until ack procedure complete(ACK and ACK_RESPONSE command)
Co-authored-by: xuanqi.wu <[email protected]>
---
.../broker/stats/ManagedCursorMetricsTest.java | 26 +++++++++++++++++-----
.../org/apache/pulsar/client/impl/ClientCnx.java | 1 +
2 files changed, 22 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 5e20c09fed1..4648ae2fb8f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,22 +18,27 @@
*/
package org.apache.pulsar.broker.stats;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.PulsarTestClient;
import org.apache.pulsar.common.stats.Metrics;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
@Test(groups = "broker")
public class ManagedCursorMetricsTest extends MockedPulsarServiceBaseTest {
@@ -49,6 +54,11 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @Override
+ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder)
throws PulsarClientException {
+ return PulsarTestClient.create(clientBuilder);
+ }
+
@Test
public void testManagedCursorMetrics() throws Exception {
final String subName = "my-sub";
@@ -63,14 +73,18 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ PulsarTestClient pulsarClient = (PulsarTestClient) this.pulsarClient;
+ @Cleanup
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
this.pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
+ .isAckReceiptEnabled(true)
.subscribe();
- Producer<byte[]> producer = pulsarClient.newProducer()
+ @Cleanup
+ Producer<byte[]> producer = this.pulsarClient.newProducer()
.topic(topicName)
.create();
@@ -83,6 +97,8 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
producer.send(message.getBytes());
consumer.acknowledge(consumer.receive().getMessageId());
}
+
+ Awaitility.await().until(() ->
pulsarClient.getConnection(topicName).get().getPendingRequests().size() == 0);
metricsList = metrics.generate();
Assert.assertFalse(metricsList.isEmpty());
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistLedgerSucceed"),
0L);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6c23d668236..1edf2e1b3a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -105,6 +105,7 @@ public class ClientCnx extends PulsarHandler {
protected final Authentication authentication;
private State state;
+ @Getter
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends
Object>> pendingRequests =
ConcurrentLongHashMap.<TimedCompletableFuture<? extends
Object>>newBuilder()
.expectedItems(16)