This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eed8c74d0ab [fix][test] Fix flaky test:
PrometheusMetricsTest.testDuplicateMetricTypeDefinitions (#18077)
eed8c74d0ab is described below
commit eed8c74d0abbc16dabf3f2c624705bdbafee4146
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Oct 18 09:23:35 2022 +0200
[fix][test] Fix flaky test:
PrometheusMetricsTest.testDuplicateMetricTypeDefinitions (#18077)
* [fix][test] Fix flaky test:
PrometheusMetricsTest.testDuplicateMetricTypeDefinitions
* fix
---
.../main/java/org/apache/pulsar/broker/PulsarService.java | 2 ++
.../pendingack/impl/MLPendingAckStoreProvider.java | 10 ++++++++++
.../impl/MLTransactionMetadataStoreProvider.java | 14 ++++++++++++--
3 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0ade26e6612..f6aec263f13 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -562,6 +562,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (transactionExecutorProvider != null) {
transactionExecutorProvider.shutdownNow();
}
+ MLPendingAckStoreProvider.closeBufferedWriterMetrics();
+ MLTransactionMetadataStoreProvider.closeBufferedWriterMetrics();
if (this.offloaderStats != null) {
this.offloaderStats.close();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index bf2771abaa6..130b485694f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -61,6 +61,16 @@ public class MLPendingAckStoreProvider implements
TransactionPendingAckStoreProv
}
}
+ public static void closeBufferedWriterMetrics() {
+ synchronized (MLPendingAckStoreProvider.class){
+ if (bufferedWriterMetrics ==
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS){
+ return;
+ }
+ bufferedWriterMetrics.close();
+ bufferedWriterMetrics =
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+ }
+ }
+
@Override
public CompletableFuture<PendingAckStore>
newPendingAckStore(PersistentSubscription subscription) {
CompletableFuture<PendingAckStore> pendingAckStoreFuture = new
CompletableFuture<>();
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index c11e422d27a..6eb6402c0ee 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -39,17 +39,27 @@ public class MLTransactionMetadataStoreProvider implements
TransactionMetadataSt
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
public static void initBufferedWriterMetrics(String
brokerAdvertisedAddress){
- if (bufferedWriterMetrics !=
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS){
+ if (bufferedWriterMetrics !=
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
return;
}
synchronized (MLTransactionMetadataStoreProvider.class){
- if (bufferedWriterMetrics !=
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS){
+ if (bufferedWriterMetrics !=
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
return;
}
bufferedWriterMetrics = new
MLTransactionMetadataStoreBufferedWriterMetrics(brokerAdvertisedAddress);
}
}
+ public static void closeBufferedWriterMetrics() {
+ synchronized (MLTransactionMetadataStoreProvider.class){
+ if (bufferedWriterMetrics ==
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
+ return;
+ }
+ bufferedWriterMetrics.close();
+ bufferedWriterMetrics =
DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+ }
+ }
+
@Override
public CompletableFuture<TransactionMetadataStore>
openStore(TransactionCoordinatorID transactionCoordinatorId,
ManagedLedgerFactory managedLedgerFactory,