This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f047de204ef [fix][test] Fix flaky MetadataStoreStatsTest and prevent
certain flakiness in all metric / stat tests (#19329)
f047de204ef is described below
commit f047de204ef16f1233fa732caa715cbdf1af758e
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 26 14:02:33 2023 +0200
[fix][test] Fix flaky MetadataStoreStatsTest and prevent certain flakiness
in all metric / stat tests (#19329)
---
.../pulsar/broker/stats/ConsumerStatsTest.java | 11 ++-
.../broker/stats/LedgerOffloaderMetricsTest.java | 10 ++
.../broker/stats/ManagedCursorMetricsTest.java | 13 ++-
.../broker/stats/ManagedLedgerMetricsTest.java | 10 ++
.../broker/stats/MetadataStoreStatsTest.java | 101 ++++++++++++---------
.../pulsar/broker/stats/PrometheusMetricsTest.java | 13 ++-
.../pulsar/broker/stats/SubscriptionStatsTest.java | 10 ++
.../stats/TransactionBatchWriterMetricsTest.java | 11 ++-
.../broker/stats/TransactionMetricsTest.java | 9 ++
.../client/api/SimpleProducerConsumerStatTest.java | 10 ++
.../proxy/server/ProxyPrometheusMetricsTest.java | 10 ++
.../apache/pulsar/proxy/server/ProxyStatsTest.java | 10 ++
12 files changed, 168 insertions(+), 50 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 1b0ee700fd0..fd552766569 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -83,11 +83,20 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
@BeforeMethod
@Override
protected void setup() throws Exception {
- conf.setMaxUnackedMessagesPerConsumer(0);
super.internalSetup();
super.producerBaseSetup();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ conf.setMaxUnackedMessagesPerConsumer(0);
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
index 522790fa5fa..a285974f13d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -37,6 +38,15 @@ public class LedgerOffloaderMetricsTest extends
BrokerTestBase {
protected void setup() throws Exception {
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index 7f8c71cb139..baa4bea5701 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
@@ -53,9 +54,19 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
@BeforeClass
@Override
protected void setup() throws Exception {
+ super.internalSetup();
+ }
+
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
conf.setTopicLevelPoliciesEnabled(false);
conf.setSystemTopicEnabled(false);
- super.internalSetup();
+
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
}
@AfterClass
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index c2b5c3d834c..bec73121e48 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
import org.apache.pulsar.client.api.Producer;
@@ -52,6 +53,15 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase
{
super.baseSetup();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index 3ce735b797f..70e58c6b079 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
@@ -44,17 +45,26 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
- conf.setTopicLevelPoliciesEnabled(false);
- conf.setSystemTopicEnabled(false);
super.baseSetup();
AuthenticationProviderToken.resetMetrics();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ conf.setTopicLevelPoliciesEnabled(false);
+ conf.setSystemTopicEnabled(false);
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by
+ // org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats.close
method which unregisters metrics
+ // asynchronously. This impacts the execution of the next test method
if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
- resetConfig();
}
@Test
@@ -76,7 +86,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
producer.newMessage().value(UUID.randomUUID().toString()).send();
}
- for (;;) {
+ for (int i = 0; i < 100; i++) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
@@ -89,51 +99,53 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
String metricsStr = output.toString();
Multimap<String, PrometheusMetricsTest.Metric> metricsMap =
PrometheusMetricsTest.parseMetrics(metricsStr);
+ String metricsDebugMessage = "Assertion failed with metrics:\n" +
metricsStr + "\n";
+
Collection<PrometheusMetricsTest.Metric> opsLatency =
metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
Collection<PrometheusMetricsTest.Metric> putBytes =
metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
- Assert.assertTrue(opsLatency.size() > 1);
- Assert.assertTrue(putBytes.size() > 1);
+ Assert.assertTrue(opsLatency.size() > 1, metricsDebugMessage);
+ Assert.assertTrue(putBytes.size() > 1, metricsDebugMessage);
for (PrometheusMetricsTest.Metric m : opsLatency) {
- Assert.assertEquals(m.tags.get("cluster"), "test");
+ Assert.assertEquals(m.tags.get("cluster"), "test",
metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
- Assert.assertNotNull(metadataStoreName);
+ Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
- ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
- Assert.assertNotNull(m.tags.get("status"));
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE),
metricsDebugMessage);
+ Assert.assertNotNull(m.tags.get("status"), metricsDebugMessage);
if (m.tags.get("status").equals("success")) {
if (m.tags.get("type").equals("get")) {
- Assert.assertTrue(m.value >= 0);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("del")) {
- Assert.assertTrue(m.value >= 0);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("put")) {
- Assert.assertTrue(m.value >= 0);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else {
- Assert.fail();
+ Assert.fail(metricsDebugMessage);
}
} else {
if (m.tags.get("type").equals("get")) {
- Assert.assertTrue(m.value >= 0);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("del")) {
- Assert.assertTrue(m.value >= 0);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else if (m.tags.get("type").equals("put")) {
- Assert.assertTrue(m.value >= 0);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
} else {
- Assert.fail();
+ Assert.fail(metricsDebugMessage);
}
}
}
for (PrometheusMetricsTest.Metric m : putBytes) {
- Assert.assertEquals(m.tags.get("cluster"), "test");
+ Assert.assertEquals(m.tags.get("cluster"), "test",
metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
- Assert.assertNotNull(metadataStoreName);
+ Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
- ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
- Assert.assertTrue(m.value > 0);
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE),
metricsDebugMessage);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
}
@@ -156,7 +168,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
producer.newMessage().value(UUID.randomUUID().toString()).send();
}
- for (;;) {
+ for (int i = 0; i < 100; i++) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
@@ -174,49 +186,50 @@ public class MetadataStoreStatsTest extends
BrokerTestBase {
Collection<PrometheusMetricsTest.Metric> batchExecuteTime =
metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
Collection<PrometheusMetricsTest.Metric> opsPerBatch =
metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");
- Assert.assertTrue(executorQueueSize.size() > 1);
- Assert.assertTrue(opsWaiting.size() > 1);
- Assert.assertTrue(batchExecuteTime.size() > 0);
- Assert.assertTrue(opsPerBatch.size() > 0);
+ String metricsDebugMessage = "Assertion failed with metrics:\n" +
metricsStr + "\n";
+
+ Assert.assertTrue(executorQueueSize.size() > 1, metricsDebugMessage);
+ Assert.assertTrue(opsWaiting.size() > 1, metricsDebugMessage);
+ Assert.assertTrue(batchExecuteTime.size() > 0, metricsDebugMessage);
+ Assert.assertTrue(opsPerBatch.size() > 0, metricsDebugMessage);
for (PrometheusMetricsTest.Metric m : executorQueueSize) {
- Assert.assertEquals(m.tags.get("cluster"), "test");
+ Assert.assertEquals(m.tags.get("cluster"), "test",
metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
- Assert.assertNotNull(metadataStoreName);
+ Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
- ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
- Assert.assertTrue(m.value >= 0);
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE),
metricsDebugMessage);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
for (PrometheusMetricsTest.Metric m : opsWaiting) {
- Assert.assertEquals(m.tags.get("cluster"), "test");
+ Assert.assertEquals(m.tags.get("cluster"), "test",
metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
- Assert.assertNotNull(metadataStoreName);
+ Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
- ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
- Assert.assertTrue(m.value >= 0);
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE),
metricsDebugMessage);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
for (PrometheusMetricsTest.Metric m : batchExecuteTime) {
- Assert.assertEquals(m.tags.get("cluster"), "test");
+ Assert.assertEquals(m.tags.get("cluster"), "test",
metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
- Assert.assertNotNull(metadataStoreName);
+ Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
- ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
- Assert.assertTrue(m.value > 0);
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE),
metricsDebugMessage);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
for (PrometheusMetricsTest.Metric m : opsPerBatch) {
- Assert.assertEquals(m.tags.get("cluster"), "test");
+ Assert.assertEquals(m.tags.get("cluster"), "test",
metricsDebugMessage);
String metadataStoreName = m.tags.get("name");
- Assert.assertNotNull(metadataStoreName);
+ Assert.assertNotNull(metadataStoreName, metricsDebugMessage);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
- ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
- Assert.assertTrue(m.value > 0);
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE),
metricsDebugMessage);
+ Assert.assertTrue(m.value >= 0, metricsDebugMessage);
}
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index db59942e15c..7eb6afb97cd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -92,12 +92,21 @@ public class PrometheusMetricsTest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
- conf.setTopicLevelPoliciesEnabled(false);
- conf.setSystemTopicEnabled(false);
super.baseSetup();
AuthenticationProviderToken.resetMetrics();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ conf.setTopicLevelPoliciesEnabled(false);
+ conf.setSystemTopicEnabled(false);
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
index 50419dcde44..bf9c1d540bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryFilterSupport;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -65,6 +66,15 @@ public class SubscriptionStatsTest extends
ProducerConsumerBase {
super.producerBaseSetup();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
index 33e7825e4dc..953c8ce6a93 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionBatchWriterMetricsTest.java
@@ -89,8 +89,9 @@ public class TransactionBatchWriterMetricsTest extends
MockedPulsarServiceBaseTe
}
@Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+
// enable transaction.
conf.setSystemTopicEnabled(true);
conf.setTransactionCoordinatorEnabled(true);
@@ -99,8 +100,14 @@ public class TransactionBatchWriterMetricsTest extends
MockedPulsarServiceBaseTe
conf.setTransactionPendingAckBatchedWriteMaxRecords(10);
conf.setTransactionLogBatchedWriteEnabled(true);
conf.setTransactionLogBatchedWriteMaxRecords(10);
+
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
}
+
@Override
protected PulsarService startBroker(ServiceConfiguration conf) throws
Exception {
PulsarService pulsar = startBrokerWithoutAuthorization(conf);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index 766e0b90d69..37d1f4b0860 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -73,6 +73,15 @@ public class TransactionMetricsTest extends BrokerTestBase {
super.baseSetup(serviceConfiguration);
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
protected void afterSetup() throws Exception {
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
TenantInfo.builder()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index c8ab95cde9d..2fa3ed9ad01 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.awaitility.Awaitility;
@@ -61,6 +62,15 @@ public class SimpleProducerConsumerStatTest extends
ProducerConsumerBase {
super.producerBaseSetup();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
index d7afddc6c15..6948996ad46 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPrometheusMetricsTest.java
@@ -39,6 +39,7 @@ import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;
import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -88,6 +89,15 @@ public class ProxyPrometheusMetricsTest extends
MockedPulsarServiceBaseTest {
proxyWebServer.start();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 9c0d19756a5..140af88aae7 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -35,6 +35,7 @@ import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Consumer;
@@ -91,6 +92,15 @@ public class ProxyStatsTest extends
MockedPulsarServiceBaseTest {
proxyWebServer.start();
}
+ @Override
+ protected ServiceConfiguration getDefaultConf() {
+ ServiceConfiguration conf = super.getDefaultConf();
+ // wait for shutdown of the broker, this prevents flakiness which
could be caused by metrics being
+ // unregistered asynchronously. This impacts the execution of the next
test method if this would be happening.
+ conf.setBrokerShutdownTimeoutMs(5000L);
+ return conf;
+ }
+
@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {