This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f047de204ef [fix][test] Fix flaky MetadataStoreStatsTest and prevent 
certain flakiness in all metric / stat tests (#19329)
f047de204ef is described below

commit f047de204ef16f1233fa732caa715cbdf1af758e
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 26 14:02:33 2023 +0200

    [fix][test] Fix flaky MetadataStoreStatsTest and prevent certain flakiness 
in all metric / stat tests (#19329)
---
 .../pulsar/broker/stats/ConsumerStatsTest.java     |  11 ++-
 .../broker/stats/LedgerOffloaderMetricsTest.java   |  10 ++
 .../broker/stats/ManagedCursorMetricsTest.java     |  13 ++-
 .../broker/stats/ManagedLedgerMetricsTest.java     |  10 ++
 .../broker/stats/MetadataStoreStatsTest.java       | 101 ++++++++++++---------
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  13 ++-
 .../pulsar/broker/stats/SubscriptionStatsTest.java |  10 ++
 .../stats/TransactionBatchWriterMetricsTest.java   |  11 ++-
 .../broker/stats/TransactionMetricsTest.java       |   9 ++
 .../client/api/SimpleProducerConsumerStatTest.java |  10 ++
 .../proxy/server/ProxyPrometheusMetricsTest.java   |  10 ++
 .../apache/pulsar/proxy/server/ProxyStatsTest.java |  10 ++
 12 files changed, 168 insertions(+), 50 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 1b0ee700fd0..fd552766569 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -83,11 +83,20 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
-        conf.setMaxUnackedMessagesPerConsumer(0);
         super.internalSetup();
         super.producerBaseSetup();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        conf.setMaxUnackedMessagesPerConsumer(0);
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
index 522790fa5fa..a285974f13d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -37,6 +38,15 @@ public class LedgerOffloaderMetricsTest  extends 
BrokerTestBase {
     protected void setup() throws Exception {
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 7f8c71cb139..baa4bea5701 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
@@ -53,9 +54,19 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
         conf.setTopicLevelPoliciesEnabled(false);
         conf.setSystemTopicEnabled(false);
-        super.internalSetup();
+
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
     }
 
     @AfterClass
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index c2b5c3d834c..bec73121e48 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
 import org.apache.pulsar.client.api.Producer;
@@ -52,6 +53,15 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase 
{
         super.baseSetup();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index 3ce735b797f..70e58c6b079 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -44,17 +45,26 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
-        conf.setTopicLevelPoliciesEnabled(false);
-        conf.setSystemTopicEnabled(false);
         super.baseSetup();
         AuthenticationProviderToken.resetMetrics();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setSystemTopicEnabled(false);
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by
+        // org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats.close 
method which unregisters metrics
+        // asynchronously. This impacts the execution of the next test method 
if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
-        resetConfig();
     }
 
     @Test
@@ -76,7 +86,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
             producer.newMessage().value(UUID.randomUUID().toString()).send();
         }
 
-        for (;;) {
+        for (int i = 0; i < 100; i++) {
             Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
             if (message == null) {
                 break;
@@ -89,51 +99,53 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
         String metricsStr = output.toString();
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
 
+        String metricsDebugMessage = "Assertion failed with metrics:\n" + 
metricsStr + "\n";
+
         Collection<PrometheusMetricsTest.Metric> opsLatency = 
metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
         Collection<PrometheusMetricsTest.Metric> putBytes = 
metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
 
-        Assert.assertTrue(opsLatency.size() > 1);
-        Assert.assertTrue(putBytes.size() > 1);
+        Assert.assertTrue(opsLatency.size() > 1, metricsDebugMessage);
+        Assert.assertTrue(putBytes.size() > 1, metricsDebugMessage);
 
         for (PrometheusMetricsTest.Metric m : opsLatency) {
-            Assert.assertEquals(m.tags.get("cluster"), "test");
+            Assert.assertEquals(m.tags.get("cluster"), "test", 
metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
-            Assert.assertNotNull(metadataStoreName);
+            Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
             
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
                     || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
-                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
-            Assert.assertNotNull(m.tags.get("status"));
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), 
metricsDebugMessage);
+            Assert.assertNotNull(m.tags.get("status"), metricsDebugMessage);
 
             if (m.tags.get("status").equals("success")) {
                 if (m.tags.get("type").equals("get")) {
-                    Assert.assertTrue(m.value >= 0);
+                    Assert.assertTrue(m.value >= 0, metricsDebugMessage);
                 } else if (m.tags.get("type").equals("del")) {
-                    Assert.assertTrue(m.value >= 0);
+                    Assert.assertTrue(m.value >= 0, metricsDebugMessage);
                 } else if (m.tags.get("type").equals("put")) {
-                    Assert.assertTrue(m.value >= 0);
+                    Assert.assertTrue(m.value >= 0, metricsDebugMessage);
                 } else {
-                    Assert.fail();
+                    Assert.fail(metricsDebugMessage);
                 }
             } else {
                 if (m.tags.get("type").equals("get")) {
-                    Assert.assertTrue(m.value >= 0);
+                    Assert.assertTrue(m.value >= 0, metricsDebugMessage);
                 } else if (m.tags.get("type").equals("del")) {
-                    Assert.assertTrue(m.value >= 0);
+                    Assert.assertTrue(m.value >= 0, metricsDebugMessage);
                 } else if (m.tags.get("type").equals("put")) {
-                    Assert.assertTrue(m.value >= 0);
+                    Assert.assertTrue(m.value >= 0, metricsDebugMessage);
                 } else {
-                    Assert.fail();
+                    Assert.fail(metricsDebugMessage);
                 }
             }
         }
         for (PrometheusMetricsTest.Metric m : putBytes) {
-            Assert.assertEquals(m.tags.get("cluster"), "test");
+            Assert.assertEquals(m.tags.get("cluster"), "test", 
metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
-            Assert.assertNotNull(metadataStoreName);
+            Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
             
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
                     || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
-                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
-            Assert.assertTrue(m.value > 0);
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), 
metricsDebugMessage);
+            Assert.assertTrue(m.value >= 0, metricsDebugMessage);
         }
     }
 
@@ -156,7 +168,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
             producer.newMessage().value(UUID.randomUUID().toString()).send();
         }
 
-        for (;;) {
+        for (int i = 0; i < 100; i++) {
             Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
             if (message == null) {
                 break;
@@ -174,49 +186,50 @@ public class MetadataStoreStatsTest extends 
BrokerTestBase {
         Collection<PrometheusMetricsTest.Metric> batchExecuteTime = 
metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
         Collection<PrometheusMetricsTest.Metric> opsPerBatch = 
metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");
 
-        Assert.assertTrue(executorQueueSize.size() > 1);
-        Assert.assertTrue(opsWaiting.size() > 1);
-        Assert.assertTrue(batchExecuteTime.size() > 0);
-        Assert.assertTrue(opsPerBatch.size() > 0);
+        String metricsDebugMessage = "Assertion failed with metrics:\n" + 
metricsStr + "\n";
+
+        Assert.assertTrue(executorQueueSize.size() > 1, metricsDebugMessage);
+        Assert.assertTrue(opsWaiting.size() > 1, metricsDebugMessage);
+        Assert.assertTrue(batchExecuteTime.size() > 0, metricsDebugMessage);
+        Assert.assertTrue(opsPerBatch.size() > 0, metricsDebugMessage);
 
         for (PrometheusMetricsTest.Metric m : executorQueueSize) {
-            Assert.assertEquals(m.tags.get("cluster"), "test");
+            Assert.assertEquals(m.tags.get("cluster"), "test", 
metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
-            Assert.assertNotNull(metadataStoreName);
+            Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
             
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
                     || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
-                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
-            Assert.assertTrue(m.value >= 0);
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), 
metricsDebugMessage);
+            Assert.assertTrue(m.value >= 0, metricsDebugMessage);
         }
         for (PrometheusMetricsTest.Metric m : opsWaiting) {
-            Assert.assertEquals(m.tags.get("cluster"), "test");
+            Assert.assertEquals(m.tags.get("cluster"), "test", 
metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
-            Assert.assertNotNull(metadataStoreName);
+            Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
             
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
                     || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
-                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
-            Assert.assertTrue(m.value >= 0);
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), 
metricsDebugMessage);
+            Assert.assertTrue(m.value >= 0, metricsDebugMessage);
         }
 
         for (PrometheusMetricsTest.Metric m : batchExecuteTime) {
-            Assert.assertEquals(m.tags.get("cluster"), "test");
+            Assert.assertEquals(m.tags.get("cluster"), "test", 
metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
-            Assert.assertNotNull(metadataStoreName);
+            Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
             
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
                     || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
-                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
-            Assert.assertTrue(m.value > 0);
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), 
metricsDebugMessage);
+            Assert.assertTrue(m.value >= 0, metricsDebugMessage);
         }
 
         for (PrometheusMetricsTest.Metric m : opsPerBatch) {
-            Assert.assertEquals(m.tags.get("cluster"), "test");
+            Assert.assertEquals(m.tags.get("cluster"), "test", 
metricsDebugMessage);
             String metadataStoreName = m.tags.get("name");
-            Assert.assertNotNull(metadataStoreName);
+            Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
             
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
                     || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
-                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
-            Assert.assertTrue(m.value > 0);
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE), 
metricsDebugMessage);
+            Assert.assertTrue(m.value >= 0, metricsDebugMessage);
         }
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index db59942e15c..7eb6afb97cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -92,12 +92,21 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
-        conf.setTopicLevelPoliciesEnabled(false);
-        conf.setSystemTopicEnabled(false);
         super.baseSetup();
         AuthenticationProviderToken.resetMetrics();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setSystemTopicEnabled(false);
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index 50419dcde44..bf9c1d540bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryFilterSupport;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -65,6 +66,15 @@ public class SubscriptionStatsTest extends 
ProducerConsumerBase {
         super.producerBaseSetup();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
index 33e7825e4dc..953c8ce6a93 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
@@ -89,8 +89,9 @@ public class TransactionBatchWriterMetricsTest extends 
MockedPulsarServiceBaseTe
     }
 
     @Override
-    protected void doInitConf() throws Exception {
-        super.doInitConf();
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+
         // enable transaction.
         conf.setSystemTopicEnabled(true);
         conf.setTransactionCoordinatorEnabled(true);
@@ -99,8 +100,14 @@ public class TransactionBatchWriterMetricsTest extends 
MockedPulsarServiceBaseTe
         conf.setTransactionPendingAckBatchedWriteMaxRecords(10);
         conf.setTransactionLogBatchedWriteEnabled(true);
         conf.setTransactionLogBatchedWriteMaxRecords(10);
+
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
     }
 
+
     @Override
     protected PulsarService startBroker(ServiceConfiguration conf) throws 
Exception {
         PulsarService pulsar = startBrokerWithoutAuthorization(conf);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 766e0b90d69..37d1f4b0860 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -73,6 +73,15 @@ public class TransactionMetricsTest extends BrokerTestBase {
         super.baseSetup(serviceConfiguration);
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     protected void afterSetup() throws Exception {
         
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 TenantInfo.builder()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index c8ab95cde9d..2fa3ed9ad01 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.awaitility.Awaitility;
@@ -61,6 +62,15 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
         super.producerBaseSetup();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
index d7afddc6c15..6948996ad46 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -39,6 +39,7 @@ import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.Response;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -88,6 +89,15 @@ public class ProxyPrometheusMetricsTest extends 
MockedPulsarServiceBaseTest {
         proxyWebServer.start();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @Override
     @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 9c0d19756a5..140af88aae7 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -35,6 +35,7 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Consumer;
@@ -91,6 +92,15 @@ public class ProxyStatsTest extends 
MockedPulsarServiceBaseTest {
         proxyWebServer.start();
     }
 
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        // wait for shutdown of the broker, this prevents flakiness which 
could be caused by metrics being
+        // unregistered asynchronously. This impacts the execution of the next 
test method if this would be happening.
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
     @Override
     @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {

Reply via email to