This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4e535cb3f4a [feat][broker] PIP-264: Add transaction metrics (#22970)
4e535cb3f4a is described below
commit 4e535cb3f4a3482b0d5dc5a3a0a63c87490704e3
Author: Dragos Misca <[email protected]>
AuthorDate: Thu Jun 27 02:54:43 2024 -0700
[feat][broker] PIP-264: Add transaction metrics (#22970)
---
.../org/apache/pulsar/broker/PulsarService.java | 15 ++++
.../broker/service/PersistentTopicAttributes.java | 30 ++++++++
.../service/persistent/PersistentSubscription.java | 7 +-
.../service/persistent/PersistentTopicMetrics.java | 14 +++-
.../broker/stats/OpenTelemetryTopicStats.java | 27 ++++++-
.../OpenTelemetryTransactionCoordinatorStats.java | 87 ++++++++++++++++++++++
...enTelemetryTransactionPendingAckStoreStats.java | 72 ++++++++++++++++++
.../buffer/TransactionBufferClientStats.java | 7 +-
.../buffer/impl/TransactionBufferClientImpl.java | 9 ++-
.../impl/TransactionBufferClientStatsImpl.java | 61 +++++++++++++--
.../transaction/pendingack/PendingAckHandle.java | 7 ++
.../pendingack/PendingAckHandleAttributes.java | 63 ++++++++++++++++
.../pendingack/PendingAckHandleStats.java | 7 ++
.../pendingack/impl/PendingAckHandleDisabled.java | 6 ++
.../pendingack/impl/PendingAckHandleImpl.java | 28 ++++---
.../pendingack/impl/PendingAckHandleStatsImpl.java | 56 +++++++++++++-
.../pulsar/broker/transaction/TransactionTest.java | 24 +++++-
.../buffer/TopicTransactionBufferTest.java | 22 +++++-
.../pendingack/PendingAckPersistentTest.java | 40 ++++++++++
.../opentelemetry/OpenTelemetryAttributes.java | 33 +++++++-
pulsar-transaction/coordinator/pom.xml | 6 ++
.../coordinator/TransactionMetadataStore.java | 9 +++
.../TransactionMetadataStoreAttributes.java | 56 ++++++--------
.../impl/InMemTransactionMetadataStore.java | 16 ++++
.../impl/MLTransactionMetadataStore.java | 16 ++++
25 files changed, 640 insertions(+), 78 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0d8bc571c57..848484fe376 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -116,6 +116,8 @@ import
org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats;
+import
org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
@@ -263,6 +265,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private OpenTelemetryConsumerStats openTelemetryConsumerStats;
private OpenTelemetryProducerStats openTelemetryProducerStats;
private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;
+ private OpenTelemetryTransactionCoordinatorStats
openTelemetryTransactionCoordinatorStats;
+ private OpenTelemetryTransactionPendingAckStoreStats
openTelemetryTransactionPendingAckStoreStats;
private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
@@ -684,6 +688,14 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();
+ if (openTelemetryTransactionPendingAckStoreStats != null) {
+ openTelemetryTransactionPendingAckStoreStats.close();
+ openTelemetryTransactionPendingAckStoreStats = null;
+ }
+ if (openTelemetryTransactionCoordinatorStats != null) {
+ openTelemetryTransactionCoordinatorStats.close();
+ openTelemetryTransactionCoordinatorStats = null;
+ }
if (openTelemetryReplicatorStats != null) {
openTelemetryReplicatorStats.close();
openTelemetryReplicatorStats = null;
@@ -996,6 +1008,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.newProvider(config.getTransactionBufferProviderClassName());
transactionPendingAckStoreProvider =
TransactionPendingAckStoreProvider
.newProvider(config.getTransactionPendingAckStoreProviderClassName());
+
+ openTelemetryTransactionCoordinatorStats = new
OpenTelemetryTransactionCoordinatorStats(this);
+ openTelemetryTransactionPendingAckStoreStats = new
OpenTelemetryTransactionPendingAckStoreStats(this);
}
this.metricsGenerator = new MetricsGenerator(this);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
index 048edafe884..51f5bdb354d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
@@ -36,6 +36,11 @@ public class PersistentTopicAttributes extends
TopicAttributes {
private final Attributes transactionCommittedAttributes;
private final Attributes transactionAbortedAttributes;
+ private final Attributes transactionBufferClientCommitSucceededAttributes;
+ private final Attributes transactionBufferClientCommitFailedAttributes;
+ private final Attributes transactionBufferClientAbortSucceededAttributes;
+ private final Attributes transactionBufferClientAbortFailedAttributes;
+
public PersistentTopicAttributes(TopicName topicName) {
super(topicName);
@@ -61,6 +66,31 @@ public class PersistentTopicAttributes extends
TopicAttributes {
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
.build();
+ transactionBufferClientCommitSucceededAttributes = Attributes.builder()
+ .putAll(commonAttributes)
+ .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+ .build();
+ transactionBufferClientCommitFailedAttributes = Attributes.builder()
+ .putAll(commonAttributes)
+ .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes)
+ .build();
+ transactionBufferClientAbortSucceededAttributes = Attributes.builder()
+ .putAll(commonAttributes)
+ .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+ .build();
+ transactionBufferClientAbortFailedAttributes = Attributes.builder()
+ .putAll(commonAttributes)
+ .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes)
+ .build();
+
compactionSuccessAttributes = Attributes.builder()
.putAll(commonAttributes)
.putAll(OpenTelemetryAttributes.CompactionStatus.SUCCESS.attributes)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 7da339a420c..a1d51668ca8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -128,6 +129,7 @@ public class PersistentSubscription extends
AbstractSubscription {
private static final Map<String, Long>
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Map.of();
private volatile ReplicatedSubscriptionSnapshotCache
replicatedSubscriptionSnapshotCache;
+ @Getter
private final PendingAckHandle pendingAckHandle;
private volatile Map<String, String> subscriptionProperties;
private volatile CompletableFuture<Void> fenceFuture;
@@ -1439,11 +1441,6 @@ public class PersistentSubscription extends
AbstractSubscription {
return cursor;
}
- @VisibleForTesting
- public PendingAckHandle getPendingAckHandle() {
- return pendingAckHandle;
- }
-
public void syncBatchPositionBitSetForPendingAck(Position position) {
this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
index f79d053a979..d8ebece7a51 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopicMetrics.java
@@ -21,12 +21,13 @@ package org.apache.pulsar.broker.service.persistent;
import java.util.concurrent.atomic.LongAdder;
import lombok.Getter;
-@SuppressWarnings("LombokGetterMayBeUsed")
+@Getter
public class PersistentTopicMetrics {
- @Getter
private final BacklogQuotaMetrics backlogQuotaMetrics = new
BacklogQuotaMetrics();
+ private final TransactionBufferClientMetrics
transactionBufferClientMetrics = new TransactionBufferClientMetrics();
+
public static class BacklogQuotaMetrics {
private final LongAdder timeBasedBacklogQuotaExceededEvictionCount =
new LongAdder();
private final LongAdder sizeBasedBacklogQuotaExceededEvictionCount =
new LongAdder();
@@ -47,4 +48,13 @@ public class PersistentTopicMetrics {
return timeBasedBacklogQuotaExceededEvictionCount.longValue();
}
}
+
+ @Getter
+ public static class TransactionBufferClientMetrics {
+ private final LongAdder commitSucceededCount = new LongAdder();
+ private final LongAdder commitFailedCount = new LongAdder();
+
+ private final LongAdder abortSucceededCount = new LongAdder();
+ private final LongAdder abortFailedCount = new LongAdder();
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
index b6d3f089077..0274cb7a7d4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
@@ -149,6 +149,12 @@ public class OpenTelemetryTopicStats implements
AutoCloseable {
public static final String TRANSACTION_COUNTER =
"pulsar.broker.topic.transaction.count";
private final ObservableLongMeasurement transactionCounter;
+ // Replaces ['pulsar_txn_tb_client_abort_failed_total',
'pulsar_txn_tb_client_commit_failed_total',
+ // 'pulsar_txn_tb_client_abort_latency',
'pulsar_txn_tb_client_commit_latency']
+ public static final String TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER =
+ "pulsar.broker.topic.transaction.buffer.client.operation.count";
+ private final ObservableLongMeasurement
transactionBufferClientOperationCounter;
+
// Replaces pulsar_subscription_delayed
public static final String DELAYED_SUBSCRIPTION_COUNTER =
"pulsar.broker.topic.subscription.delayed.entry.count";
private final ObservableLongMeasurement delayedSubscriptionCounter;
@@ -333,6 +339,12 @@ public class OpenTelemetryTopicStats implements
AutoCloseable {
.setDescription("The number of transactions on this topic.")
.buildObserver();
+ transactionBufferClientOperationCounter = meter
+ .counterBuilder(TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
+ .setUnit("{operation}")
+ .setDescription("The number of operations on the transaction
buffer client.")
+ .buildObserver();
+
delayedSubscriptionCounter = meter
.upDownCounterBuilder(DELAYED_SUBSCRIPTION_COUNTER)
.setUnit("{entry}")
@@ -371,6 +383,7 @@ public class OpenTelemetryTopicStats implements
AutoCloseable {
compactionEntriesCounter,
compactionBytesCounter,
transactionCounter,
+ transactionBufferClientOperationCounter,
delayedSubscriptionCounter);
}
@@ -399,6 +412,8 @@ public class OpenTelemetryTopicStats implements
AutoCloseable {
}
if (topic instanceof PersistentTopic persistentTopic) {
+ var persistentTopicMetrics =
persistentTopic.getPersistentTopicMetrics();
+
var persistentTopicAttributes =
persistentTopic.getTopicAttributes();
var managedLedger = persistentTopic.getManagedLedger();
var managedLedgerStats =
persistentTopic.getManagedLedger().getStats();
@@ -416,7 +431,7 @@ public class OpenTelemetryTopicStats implements
AutoCloseable {
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(),
attributes);
backlogQuotaAge.record(topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds(),
attributes);
- var backlogQuotaMetrics =
persistentTopic.getPersistentTopicMetrics().getBacklogQuotaMetrics();
+ var backlogQuotaMetrics =
persistentTopicMetrics.getBacklogQuotaMetrics();
backlogEvictionCounter.record(backlogQuotaMetrics.getSizeBasedBacklogQuotaExceededEvictionCount(),
persistentTopicAttributes.getSizeBasedQuotaAttributes());
backlogEvictionCounter.record(backlogQuotaMetrics.getTimeBasedBacklogQuotaExceededEvictionCount(),
@@ -430,6 +445,16 @@ public class OpenTelemetryTopicStats implements
AutoCloseable {
transactionCounter.record(txnBuffer.getAbortedTxnCount(),
persistentTopicAttributes.getTransactionAbortedAttributes());
+ var txnBufferClientMetrics =
persistentTopicMetrics.getTransactionBufferClientMetrics();
+
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitSucceededCount().sum(),
+
persistentTopicAttributes.getTransactionBufferClientCommitSucceededAttributes());
+
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitFailedCount().sum(),
+
persistentTopicAttributes.getTransactionBufferClientCommitFailedAttributes());
+
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortSucceededCount().sum(),
+
persistentTopicAttributes.getTransactionBufferClientAbortSucceededAttributes());
+
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortFailedCount().sum(),
+
persistentTopicAttributes.getTransactionBufferClientAbortFailedAttributes());
+
Optional.ofNullable(pulsar.getNullableCompactor())
.map(Compactor::getStats)
.flatMap(compactorMXBean ->
compactorMXBean.getCompactionRecordForTopic(topic.getName()))
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java
new file mode 100644
index 00000000000..ab73b2390b3
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionCoordinatorStats.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+
+public class OpenTelemetryTransactionCoordinatorStats implements AutoCloseable
{
+
+ // Replaces ['pulsar_txn_aborted_total',
+ // 'pulsar_txn_committed_total',
+ // 'pulsar_txn_created_total',
+ // 'pulsar_txn_timeout_total',
+ // 'pulsar_txn_active_count']
+ public static final String TRANSACTION_COUNTER =
"pulsar.broker.transaction.coordinator.transaction.count";
+ private final ObservableLongMeasurement transactionCounter;
+
+ // Replaces pulsar_txn_append_log_total
+ public static final String APPEND_LOG_COUNTER =
"pulsar.broker.transaction.coordinator.append.log.count";
+ private final ObservableLongMeasurement appendLogCounter;
+
+ private final BatchCallback batchCallback;
+
+ public OpenTelemetryTransactionCoordinatorStats(PulsarService pulsar) {
+ var meter = pulsar.getOpenTelemetry().getMeter();
+
+ transactionCounter = meter
+ .upDownCounterBuilder(TRANSACTION_COUNTER)
+ .setUnit("{transaction}")
+ .setDescription("The number of transactions handled by the
coordinator.")
+ .buildObserver();
+
+ appendLogCounter = meter
+ .counterBuilder(APPEND_LOG_COUNTER)
+ .setUnit("{entry}")
+ .setDescription("The number of transaction metadata entries
appended by the coordinator.")
+ .buildObserver();
+
+ batchCallback = meter.batchCallback(() -> {
+ var transactionMetadataStoreService =
pulsar.getTransactionMetadataStoreService();
+ // Avoid NPE during Pulsar shutdown.
+ if (transactionMetadataStoreService != null) {
+ transactionMetadataStoreService.getStores()
+ .values()
+
.forEach(this::recordMetricsForTransactionMetadataStore);
+ }
+ },
+ transactionCounter,
+ appendLogCounter);
+ }
+
+ @Override
+ public void close() {
+ batchCallback.close();
+ }
+
+ private void
recordMetricsForTransactionMetadataStore(TransactionMetadataStore
transactionMetadataStore) {
+ var attributes = transactionMetadataStore.getAttributes();
+ var stats = transactionMetadataStore.getMetadataStoreStats();
+
+ transactionCounter.record(stats.getAbortedCount(),
attributes.getTxnAbortedAttributes());
+ transactionCounter.record(stats.getActives(),
attributes.getTxnActiveAttributes());
+ transactionCounter.record(stats.getCommittedCount(),
attributes.getTxnCommittedAttributes());
+ transactionCounter.record(stats.getCreatedCount(),
attributes.getTxnCreatedAttributes());
+ transactionCounter.record(stats.getTimeoutCount(),
attributes.getTxnTimeoutAttributes());
+
+ appendLogCounter.record(stats.getAppendLogCount(),
attributes.getCommonAttributes());
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java
new file mode 100644
index 00000000000..562ad56e44d
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTransactionPendingAckStoreStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.ObservableLongCounter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+
+public class OpenTelemetryTransactionPendingAckStoreStats implements
AutoCloseable {
+
+ // Replaces ['pulsar_txn_tp_committed_count_total',
'pulsar_txn_tp_aborted_count_total']
+ public static final String ACK_COUNTER =
"pulsar.broker.transaction.pending.ack.store.transaction.count";
+ private final ObservableLongCounter ackCounter;
+
+ public OpenTelemetryTransactionPendingAckStoreStats(PulsarService pulsar) {
+ var meter = pulsar.getOpenTelemetry().getMeter();
+
+ ackCounter = meter
+ .counterBuilder(ACK_COUNTER)
+ .setUnit("{transaction}")
+ .setDescription("The number of transactions handled by the
persistent ack store.")
+ .buildWithCallback(measurement -> pulsar.getBrokerService()
+ .getTopics()
+ .values()
+ .stream()
+ .filter(topicFuture -> topicFuture.isDone() &&
!topicFuture.isCompletedExceptionally())
+ .map(CompletableFuture::join)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .filter(Topic::isPersistent)
+ .map(Topic::getSubscriptions)
+ .forEach(subs -> subs.forEach((__, sub) ->
recordMetricsForSubscription(measurement, sub))));
+ }
+
+ @Override
+ public void close() {
+ ackCounter.close();
+ }
+
+ private void recordMetricsForSubscription(ObservableLongMeasurement
measurement, Subscription subscription) {
+ assert subscription instanceof PersistentSubscription; // The topics
have already been filtered for persistence.
+ var stats = ((PersistentSubscription)
subscription).getPendingAckHandle().getPendingAckHandleStats();
+ if (stats != null) {
+ var attributes = stats.getAttributes();
+ measurement.record(stats.getCommitSuccessCount(),
attributes.getCommitSuccessAttributes());
+ measurement.record(stats.getCommitFailedCount(),
attributes.getCommitFailureAttributes());
+ measurement.record(stats.getAbortSuccessCount(),
attributes.getAbortSuccessAttributes());
+ measurement.record(stats.getAbortFailedCount(),
attributes.getAbortFailureAttributes());
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
index 8fda233ff1d..c21b212f981 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
@@ -34,10 +35,10 @@ public interface TransactionBufferClientStats {
void close();
- static TransactionBufferClientStats create(boolean exposeTopicMetrics,
TransactionBufferHandler handler,
- boolean enableTxnCoordinator) {
+ static TransactionBufferClientStats create(PulsarService pulsarService,
boolean exposeTopicMetrics,
+ TransactionBufferHandler
handler, boolean enableTxnCoordinator) {
return enableTxnCoordinator
- ?
TransactionBufferClientStatsImpl.getInstance(exposeTopicMetrics, handler) :
NOOP;
+ ? TransactionBufferClientStatsImpl.getInstance(pulsarService,
exposeTopicMetrics, handler) : NOOP;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
index 382d640ca86..96ad0203900 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -39,10 +39,11 @@ public class TransactionBufferClientImpl implements
TransactionBufferClient {
private final TransactionBufferHandler tbHandler;
private final TransactionBufferClientStats stats;
- private TransactionBufferClientImpl(TransactionBufferHandler tbHandler,
boolean exposeTopicLevelMetrics,
- boolean enableTxnCoordinator) {
+ private TransactionBufferClientImpl(PulsarService pulsarService,
TransactionBufferHandler tbHandler,
+ boolean exposeTopicLevelMetrics,
boolean enableTxnCoordinator) {
this.tbHandler = tbHandler;
- this.stats =
TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler,
enableTxnCoordinator);
+ this.stats = TransactionBufferClientStats.create(pulsarService,
exposeTopicLevelMetrics, tbHandler,
+ enableTxnCoordinator);
}
public static TransactionBufferClient create(PulsarService pulsarService,
HashedWheelTimer timer,
@@ -53,7 +54,7 @@ public class TransactionBufferClientImpl implements
TransactionBufferClient {
ServiceConfiguration config = pulsarService.getConfig();
boolean exposeTopicLevelMetrics =
config.isExposeTopicLevelMetricsInPrometheus();
boolean enableTxnCoordinator =
config.isTransactionCoordinatorEnabled();
- return new TransactionBufferClientImpl(handler,
exposeTopicLevelMetrics, enableTxnCoordinator);
+ return new TransactionBufferClientImpl(pulsarService, handler,
exposeTopicLevelMetrics, enableTxnCoordinator);
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
index a447f707893..4f1c2ca30cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java
@@ -18,31 +18,55 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.NonNull;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopicMetrics;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import
org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
public final class TransactionBufferClientStatsImpl implements
TransactionBufferClientStats {
private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999,
0.9999, 1};
private final AtomicBoolean closed = new AtomicBoolean(false);
+ @PulsarDeprecatedMetric(newMetricName =
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
private final Counter abortFailed;
+ @PulsarDeprecatedMetric(newMetricName =
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
private final Counter commitFailed;
+ @PulsarDeprecatedMetric(newMetricName =
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
private final Summary abortLatency;
+ @PulsarDeprecatedMetric(newMetricName =
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER)
private final Summary commitLatency;
+
+ public static final String PENDING_TRANSACTION_COUNTER =
"pulsar.broker.transaction.buffer.client.pending.count";
+ private final ObservableLongUpDownCounter pendingTransactionCounter;
+
+ @PulsarDeprecatedMetric(newMetricName = PENDING_TRANSACTION_COUNTER)
private final Gauge pendingRequests;
private final boolean exposeTopicLevelMetrics;
+ private final BrokerService brokerService;
+
private static TransactionBufferClientStats instance;
- private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
- TransactionBufferHandler handler)
{
+ private TransactionBufferClientStatsImpl(@NonNull PulsarService
pulsarService,
+ boolean exposeTopicLevelMetrics,
+ @NonNull TransactionBufferHandler
handler) {
+ this.brokerService =
Objects.requireNonNull(pulsarService.getBrokerService());
this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
String[] labelNames = exposeTopicLevelMetrics
? new String[]{"namespace", "topic"} : new
String[]{"namespace"};
@@ -63,9 +87,14 @@ public final class TransactionBufferClientStatsImpl
implements TransactionBuffer
.setChild(new Gauge.Child() {
@Override
public double get() {
- return null == handler ? 0 :
handler.getPendingRequestsCount();
+ return handler.getPendingRequestsCount();
}
});
+ this.pendingTransactionCounter =
pulsarService.getOpenTelemetry().getMeter()
+ .upDownCounterBuilder(PENDING_TRANSACTION_COUNTER)
+ .setDescription("The number of pending transactions in the
transaction buffer client.")
+ .setUnit("{transaction}")
+ .buildWithCallback(measurement ->
measurement.record(handler.getPendingRequestsCount()));
}
private Summary buildSummary(String name, String help, String[]
labelNames) {
@@ -77,33 +106,52 @@ public final class TransactionBufferClientStatsImpl
implements TransactionBuffer
return builder.register();
}
- public static synchronized TransactionBufferClientStats
getInstance(boolean exposeTopicLevelMetrics,
+ public static synchronized TransactionBufferClientStats
getInstance(PulsarService pulsarService,
+
boolean exposeTopicLevelMetrics,
TransactionBufferHandler handler) {
if (null == instance) {
- instance = new
TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+ instance = new TransactionBufferClientStatsImpl(pulsarService,
exposeTopicLevelMetrics, handler);
}
-
return instance;
}
@Override
public void recordAbortFailed(String topic) {
this.abortFailed.labels(labelValues(topic)).inc();
+ getTransactionBufferClientMetrics(topic)
+
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getAbortFailedCount)
+ .ifPresent(LongAdder::increment);
}
@Override
public void recordCommitFailed(String topic) {
this.commitFailed.labels(labelValues(topic)).inc();
+ getTransactionBufferClientMetrics(topic)
+
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getCommitFailedCount)
+ .ifPresent(LongAdder::increment);
}
@Override
public void recordAbortLatency(String topic, long nanos) {
this.abortLatency.labels(labelValues(topic)).observe(nanos);
+ getTransactionBufferClientMetrics(topic)
+
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getAbortSucceededCount)
+ .ifPresent(LongAdder::increment);
}
@Override
public void recordCommitLatency(String topic, long nanos) {
this.commitLatency.labels(labelValues(topic)).observe(nanos);
+ getTransactionBufferClientMetrics(topic)
+
.map(PersistentTopicMetrics.TransactionBufferClientMetrics::getCommitSucceededCount)
+ .ifPresent(LongAdder::increment);
+ }
+
+ private Optional<PersistentTopicMetrics.TransactionBufferClientMetrics>
getTransactionBufferClientMetrics(
+ String topic) {
+ return brokerService.getTopicReference(topic)
+ .filter(t -> t instanceof PersistentTopic)
+ .map(t -> ((PersistentTopic)
t).getPersistentTopicMetrics().getTransactionBufferClientMetrics());
}
private String[] labelValues(String topic) {
@@ -125,6 +173,7 @@ public final class TransactionBufferClientStatsImpl
implements TransactionBuffer
CollectorRegistry.defaultRegistry.unregister(this.abortLatency);
CollectorRegistry.defaultRegistry.unregister(this.commitLatency);
CollectorRegistry.defaultRegistry.unregister(this.pendingRequests);
+ pendingTransactionCounter.close();
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index 168a6b1483f..dcebbb2829e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -145,6 +145,13 @@ public interface PendingAckHandle {
*/
TransactionPendingAckStats getStats(boolean lowWaterMarks);
+ /**
+ * Get the raw pending ack handle stats.
+ *
+ * @return the raw stats of this pending ack handle.
+ */
+ PendingAckHandleStats getPendingAckHandleStats();
+
/**
* Close the pending ack handle.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java
new file mode 100644
index 00000000000..87363b673e1
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleAttributes.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import io.opentelemetry.api.common.Attributes;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.TransactionPendingAckOperationStatus;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.TransactionStatus;
+
+@Getter
+public class PendingAckHandleAttributes {
+
+ private final Attributes commitSuccessAttributes;
+ private final Attributes commitFailureAttributes;
+ private final Attributes abortSuccessAttributes;
+ private final Attributes abortFailureAttributes;
+
+ public PendingAckHandleAttributes(String topic, String subscription) {
+ var topicName = TopicName.get(topic);
+ commitSuccessAttributes = getAttributes(topicName, subscription,
TransactionStatus.COMMITTED,
+ TransactionPendingAckOperationStatus.SUCCESS);
+ commitFailureAttributes = getAttributes(topicName, subscription,
TransactionStatus.COMMITTED,
+ TransactionPendingAckOperationStatus.FAILURE);
+ abortSuccessAttributes = getAttributes(topicName, subscription,
TransactionStatus.ABORTED,
+ TransactionPendingAckOperationStatus.SUCCESS);
+ abortFailureAttributes = getAttributes(topicName, subscription,
TransactionStatus.ABORTED,
+ TransactionPendingAckOperationStatus.FAILURE);
+ }
+
+ private static Attributes getAttributes(TopicName topicName, String
subscriptionName,
+ TransactionStatus txStatus,
+
TransactionPendingAckOperationStatus txAckStoreStatus) {
+ var builder = Attributes.builder()
+ .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME,
subscriptionName)
+ .put(OpenTelemetryAttributes.PULSAR_TENANT,
topicName.getTenant())
+ .put(OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicName.getNamespace())
+ .put(OpenTelemetryAttributes.PULSAR_TOPIC,
topicName.getPartitionedTopicName())
+ .putAll(txStatus.attributes)
+ .putAll(txAckStoreStatus.attributes);
+ if (topicName.isPartitioned()) {
+ builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX,
topicName.getPartitionIndex());
+ }
+ return builder.build();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
index 7f01b9b69f9..855651c9116 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java
@@ -28,6 +28,13 @@ public interface PendingAckHandleStats {
void close();
+ long getCommitSuccessCount();
+ long getCommitFailedCount();
+ long getAbortSuccessCount();
+ long getAbortFailedCount();
+
+ PendingAckHandleAttributes getAttributes();
+
static PendingAckHandleStats create(String topic, String subName, boolean
exposeTopicLevelMetrics) {
return new PendingAckHandleStatsImpl(topic, subName,
exposeTopicLevelMetrics);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 4d5852ea33d..fb633f7af65 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.MutablePair;
import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
@@ -91,6 +92,11 @@ public class PendingAckHandleDisabled implements
PendingAckHandle {
return null;
}
+ @Override
+ public PendingAckHandleStats getPendingAckHandleStats() {
+ return null;
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 98d0d3bf1b9..6a071c891ff 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -50,7 +50,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -155,20 +154,14 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
this.topicName = persistentSubscription.getTopicName();
this.subName = persistentSubscription.getName();
this.persistentSubscription = persistentSubscription;
- internalPinnedExecutor = persistentSubscription
- .getTopic()
- .getBrokerService()
- .getPulsar()
- .getTransactionExecutorProvider()
- .getExecutor(this);
-
- ServiceConfiguration config =
persistentSubscription.getTopic().getBrokerService().pulsar().getConfig();
- boolean exposeTopicLevelMetrics =
config.isExposeTopicLevelMetricsInPrometheus();
- this.handleStats = PendingAckHandleStats.create(topicName, subName,
exposeTopicLevelMetrics);
-
- this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
-
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
- transactionOpTimer =
persistentSubscription.getTopic().getBrokerService().getPulsar().getTransactionTimer();
+ var pulsar =
persistentSubscription.getTopic().getBrokerService().getPulsar();
+ internalPinnedExecutor =
pulsar.getTransactionExecutorProvider().getExecutor(this);
+
+ this.handleStats = PendingAckHandleStats.create(
+ topicName, subName,
pulsar.getConfig().isExposeTopicLevelMetricsInPrometheus());
+
+ this.pendingAckStoreProvider =
pulsar.getTransactionPendingAckStoreProvider();
+ transactionOpTimer = pulsar.getTransactionTimer();
init();
}
@@ -1021,6 +1014,11 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
return transactionInPendingAckStats;
}
+ @Override
+ public PendingAckHandleStats getPendingAckHandleStats() {
+ return handleStats;
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
changeToCloseState();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
index f30c233af59..a89b582b838 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java
@@ -22,7 +22,10 @@ import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.StringUtils;
+import
org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleAttributes;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats;
import org.apache.pulsar.common.naming.TopicName;
@@ -37,6 +40,19 @@ public class PendingAckHandleStatsImpl implements
PendingAckHandleStats {
private final String[] labelFailed;
private final String[] commitLatencyLabel;
+ private final String topic;
+ private final String subscription;
+
+ private final LongAdder commitTxnSucceedCounter = new LongAdder();
+ private final LongAdder commitTxnFailedCounter = new LongAdder();
+ private final LongAdder abortTxnSucceedCounter = new LongAdder();
+ private final LongAdder abortTxnFailedCounter = new LongAdder();
+
+ private volatile PendingAckHandleAttributes attributes = null;
+ private static final
AtomicReferenceFieldUpdater<PendingAckHandleStatsImpl,
PendingAckHandleAttributes>
+ ATTRIBUTES_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+ PendingAckHandleStatsImpl.class,
PendingAckHandleAttributes.class, "attributes");
+
public PendingAckHandleStatsImpl(String topic, String subscription,
boolean exposeTopicLevelMetrics) {
initialize(exposeTopicLevelMetrics);
@@ -51,6 +67,9 @@ public class PendingAckHandleStatsImpl implements
PendingAckHandleStats {
}
}
+ this.topic = topic;
+ this.subscription = subscription;
+
labelSucceed = exposeTopicLevelMetrics0
? new String[]{namespace, topic, subscription, "succeed"} :
new String[]{namespace, "succeed"};
labelFailed = exposeTopicLevelMetrics0
@@ -62,18 +81,24 @@ public class PendingAckHandleStatsImpl implements
PendingAckHandleStats {
@Override
public void recordCommitTxn(boolean success, long nanos) {
String[] labels;
+ LongAdder counter;
if (success) {
labels = labelSucceed;
+ counter = commitTxnSucceedCounter;
commitTxnLatency.labels(commitLatencyLabel).observe(TimeUnit.NANOSECONDS.toMicros(nanos));
} else {
labels = labelFailed;
+ counter = commitTxnFailedCounter;
}
commitTxnCounter.labels(labels).inc();
+ counter.increment();
}
@Override
public void recordAbortTxn(boolean success) {
abortTxnCounter.labels(success ? labelSucceed : labelFailed).inc();
+ var counter = success ? abortTxnSucceedCounter : abortTxnFailedCounter;
+ counter.increment();
}
@Override
@@ -81,11 +106,40 @@ public class PendingAckHandleStatsImpl implements
PendingAckHandleStats {
if (exposeTopicLevelMetrics0) {
commitTxnCounter.remove(this.labelSucceed);
commitTxnCounter.remove(this.labelFailed);
+ abortTxnCounter.remove(this.labelSucceed);
abortTxnCounter.remove(this.labelFailed);
- abortTxnCounter.remove(this.labelFailed);
}
}
+ @Override
+ public long getCommitSuccessCount() {
+ return commitTxnSucceedCounter.sum();
+ }
+
+ @Override
+ public long getCommitFailedCount() {
+ return commitTxnFailedCounter.sum();
+ }
+
+ @Override
+ public long getAbortSuccessCount() {
+ return abortTxnSucceedCounter.sum();
+ }
+
+ @Override
+ public long getAbortFailedCount() {
+ return abortTxnFailedCounter.sum();
+ }
+
+ @Override
+ public PendingAckHandleAttributes getAttributes() {
+ if (attributes != null) {
+ return attributes;
+ }
+ return ATTRIBUTES_UPDATER.updateAndGet(PendingAckHandleStatsImpl.this,
+ old -> old != null ? old : new
PendingAckHandleAttributes(topic, subscription));
+ }
+
static void initialize(boolean exposeTopicLevelMetrics) {
if (INITIALIZED.compareAndSet(false, true)) {
exposeTopicLevelMetrics0 = exposeTopicLevelMetrics;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 14d4375b7bf..2a928084e64 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.transaction;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static
org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME;
import static
org.apache.pulsar.common.naming.SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
import static
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
@@ -94,7 +95,6 @@ import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -231,19 +231,35 @@ public class TransactionTest extends TransactionTestBase {
.build();
var metrics =
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics();
- BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER,
+ Attributes.builder()
+ .putAll(attributes)
+ .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+ .build(),
+ 1);
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER,
+ Attributes.builder()
+ .putAll(attributes)
+ .remove(OpenTelemetryAttributes.PULSAR_DOMAIN)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.SUCCESS.attributes)
+ .build(),
+ 1);
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
Attributes.builder()
.putAll(attributes)
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed")
.build(),
1);
- BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
Attributes.builder()
.putAll(attributes)
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted")
.build(),
1);
- BrokerOpenTelemetryTestUtil.assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
+ assertMetricLongSumValue(metrics,
OpenTelemetryTopicStats.TRANSACTION_COUNTER,
Attributes.builder()
.putAll(attributes)
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "active")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index af12caf1efd..dea79f391e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
+import io.opentelemetry.api.common.Attributes;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -44,6 +46,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
@@ -57,6 +60,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
@@ -70,7 +74,6 @@ import org.testng.annotations.Test;
public class TopicTransactionBufferTest extends TransactionTestBase {
-
@BeforeMethod(alwaysRun = true)
protected void setup() throws Exception {
setBrokerCount(1);
@@ -101,10 +104,19 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
@Test
public void testTransactionBufferAppendMarkerWriteFailState() throws
Exception {
final String topic = "persistent://" + NAMESPACE1 +
"/testPendingAckManageLedgerWriteFailState";
+ var attributes = Attributes.builder()
+ .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx")
+ .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1")
+ .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionBufferClientOperationStatus.FAILURE.attributes)
+ .build();
+
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
+ @Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
@@ -112,11 +124,19 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
.enableBatching(false)
.create();
+ assertMetricLongSumValue(
+
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(),
+
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER,
attributes, 0);
+
producer.newMessage(txn).value("test".getBytes()).send();
PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0)
.getBrokerService().getTopic(TopicName.get(topic).toString(),
false).get().get();
FieldUtils.writeField(persistentTopic.getManagedLedger(), "state",
ManagedLedgerImpl.State.WriteFailed, true);
txn.commit().get();
+
+ assertMetricLongSumValue(
+
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics(),
+
OpenTelemetryTopicStats.TRANSACTION_BUFFER_CLIENT_OPERATION_COUNTER,
attributes, 1);
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 00cdb4162f0..9487e3d3746 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.transaction.pendingack;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.mockito.ArgumentMatchers.any;
@@ -31,6 +32,7 @@ import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
import com.google.common.collect.Multimap;
+import io.opentelemetry.api.common.Attributes;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -56,6 +58,7 @@ import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import
org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
@@ -78,6 +81,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -366,6 +370,42 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
assertTrue(metric.value > 0);
}
}
+
+ var otelMetrics =
pulsarTestContexts.get(0).getOpenTelemetryMetricReader().collectAllMetrics();
+ var commonAttributes = Attributes.builder()
+ .put(OpenTelemetryAttributes.PULSAR_TENANT, "tnx")
+ .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "tnx/ns1")
+ .put(OpenTelemetryAttributes.PULSAR_TOPIC,
TopicName.get(PENDING_ACK_REPLAY_TOPIC).toString())
+ .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subName)
+ .build();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+ Attributes.builder()
+ .putAll(commonAttributes)
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed")
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS,
"success")
+ .build(),
+ 50);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+ Attributes.builder()
+ .putAll(commonAttributes)
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "committed")
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS,
"failure")
+ .build(),
+ 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+ Attributes.builder()
+ .putAll(commonAttributes)
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted")
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS,
"success")
+ .build(),
+ 50);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryTransactionPendingAckStoreStats.ACK_COUNTER,
+ Attributes.builder()
+ .putAll(commonAttributes)
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_STATUS, "aborted")
+
.put(OpenTelemetryAttributes.PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS,
"failure")
+ .build(),
+ 0);
}
@Test
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index b530b50ee59..f485e300926 100644
---
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -117,12 +117,43 @@ public interface OpenTelemetryAttributes {
*/
AttributeKey<String> PULSAR_TRANSACTION_STATUS =
AttributeKey.stringKey("pulsar.transaction.status");
enum TransactionStatus {
+ ABORTED,
ACTIVE,
COMMITTED,
- ABORTED;
+ CREATED,
+ TIMEOUT;
public final Attributes attributes =
Attributes.of(PULSAR_TRANSACTION_STATUS, name().toLowerCase());
}
+ /**
+ * The status of the Pulsar transaction ack store operation.
+ */
+ AttributeKey<String> PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS =
+
AttributeKey.stringKey("pulsar.transaction.pending.ack.store.operation.status");
+ enum TransactionPendingAckOperationStatus {
+ SUCCESS,
+ FAILURE;
+ public final Attributes attributes =
+ Attributes.of(PULSAR_TRANSACTION_ACK_STORE_OPERATION_STATUS,
name().toLowerCase());
+ }
+
+ /**
+ * The ID of the Pulsar transaction coordinator.
+ */
+ AttributeKey<Long> PULSAR_TRANSACTION_COORDINATOR_ID =
AttributeKey.longKey("pulsar.transaction.coordinator.id");
+
+ /**
+ * The status of the Pulsar transaction buffer client operation.
+ */
+ AttributeKey<String> PULSAR_TRANSACTION_BUFFER_CLIENT_OPERATION_STATUS =
+
AttributeKey.stringKey("pulsar.transaction.buffer.client.operation.status");
+ enum TransactionBufferClientOperationStatus {
+ SUCCESS,
+ FAILURE;
+ public final Attributes attributes =
+
Attributes.of(PULSAR_TRANSACTION_BUFFER_CLIENT_OPERATION_STATUS,
name().toLowerCase());
+ }
+
/**
* The status of the Pulsar compaction operation.
*/
diff --git a/pulsar-transaction/coordinator/pom.xml
b/pulsar-transaction/coordinator/pom.xml
index 4728cd40634..fc326d9e9ba 100644
--- a/pulsar-transaction/coordinator/pom.xml
+++ b/pulsar-transaction/coordinator/pom.xml
@@ -41,6 +41,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-opentelemetry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>managed-ledger</artifactId>
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
index ff5adb4d409..850fcfb4d19 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java
@@ -133,6 +133,15 @@ public interface TransactionMetadataStore {
*/
TransactionMetadataStoreStats getMetadataStoreStats();
+ /**
+ * Get the transaction metadata store OpenTelemetry attributes.
+ *
+ * @return TransactionMetadataStoreAttributes {@link
TransactionMetadataStoreAttributes}
+ */
+ default TransactionMetadataStoreAttributes getAttributes() {
+ return new TransactionMetadataStoreAttributes(this);
+ }
+
/**
* Get the transactions witch timeout is bigger than given timeout.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java
similarity index 50%
copy from
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
copy to
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java
index 048edafe884..e8ae0f6d039 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreAttributes.java
@@ -16,58 +16,44 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service;
+package org.apache.pulsar.transaction.coordinator;
import io.opentelemetry.api.common.Attributes;
import lombok.Getter;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
@Getter
-public class PersistentTopicAttributes extends TopicAttributes {
-
- private final Attributes timeBasedQuotaAttributes;
- private final Attributes sizeBasedQuotaAttributes;
-
- private final Attributes compactionSuccessAttributes;
- private final Attributes compactionFailureAttributes;
-
- private final Attributes transactionActiveAttributes;
- private final Attributes transactionCommittedAttributes;
- private final Attributes transactionAbortedAttributes;
-
- public PersistentTopicAttributes(TopicName topicName) {
- super(topicName);
-
- timeBasedQuotaAttributes = Attributes.builder()
- .putAll(commonAttributes)
-
.putAll(OpenTelemetryAttributes.BacklogQuotaType.TIME.attributes)
- .build();
- sizeBasedQuotaAttributes = Attributes.builder()
+public class TransactionMetadataStoreAttributes {
+
+ private final Attributes commonAttributes;
+ private final Attributes txnAbortedAttributes;
+ private final Attributes txnActiveAttributes;
+ private final Attributes txnCommittedAttributes;
+ private final Attributes txnCreatedAttributes;
+ private final Attributes txnTimeoutAttributes;
+
+ public TransactionMetadataStoreAttributes(TransactionMetadataStore store) {
+ this.commonAttributes = Attributes.of(
+ OpenTelemetryAttributes.PULSAR_TRANSACTION_COORDINATOR_ID,
store.getTransactionCoordinatorID().getId());
+ this.txnAbortedAttributes = Attributes.builder()
.putAll(commonAttributes)
-
.putAll(OpenTelemetryAttributes.BacklogQuotaType.SIZE.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
.build();
-
- transactionActiveAttributes = Attributes.builder()
+ this.txnActiveAttributes = Attributes.builder()
.putAll(commonAttributes)
.putAll(OpenTelemetryAttributes.TransactionStatus.ACTIVE.attributes)
.build();
- transactionCommittedAttributes = Attributes.builder()
+ this.txnCommittedAttributes = Attributes.builder()
.putAll(commonAttributes)
.putAll(OpenTelemetryAttributes.TransactionStatus.COMMITTED.attributes)
.build();
- transactionAbortedAttributes = Attributes.builder()
- .putAll(commonAttributes)
-
.putAll(OpenTelemetryAttributes.TransactionStatus.ABORTED.attributes)
- .build();
-
- compactionSuccessAttributes = Attributes.builder()
+ this.txnCreatedAttributes = Attributes.builder()
.putAll(commonAttributes)
-
.putAll(OpenTelemetryAttributes.CompactionStatus.SUCCESS.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.CREATED.attributes)
.build();
- compactionFailureAttributes = Attributes.builder()
+ this.txnTimeoutAttributes = Attributes.builder()
.putAll(commonAttributes)
-
.putAll(OpenTelemetryAttributes.CompactionStatus.FAILURE.attributes)
+
.putAll(OpenTelemetryAttributes.TransactionStatus.TIMEOUT.attributes)
.build();
}
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
index 0f3c5e42d7a..7817d484875 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java
@@ -23,12 +23,14 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
@@ -49,6 +51,11 @@ class InMemTransactionMetadataStore implements
TransactionMetadataStore {
private final LongAdder abortTransactionCount;
private final LongAdder transactionTimeoutCount;
+ private volatile TransactionMetadataStoreAttributes attributes = null;
+ private static final
AtomicReferenceFieldUpdater<InMemTransactionMetadataStore,
TransactionMetadataStoreAttributes>
+ ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+ InMemTransactionMetadataStore.class,
TransactionMetadataStoreAttributes.class, "attributes");
+
InMemTransactionMetadataStore(TransactionCoordinatorID tcID) {
this.tcID = tcID;
this.localID = new AtomicLong(0L);
@@ -165,4 +172,13 @@ class InMemTransactionMetadataStore implements
TransactionMetadataStore {
public List<TxnMeta> getSlowTransactions(long timeout) {
return null;
}
+
+ @Override
+ public TransactionMetadataStoreAttributes getAttributes() {
+ if (attributes != null) {
+ return attributes;
+ }
+ return ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
+ old -> old != null ? old : new
TransactionMetadataStoreAttributes(this));
+ }
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index b6eaad2e3e3..6bd7a947e38 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
@@ -45,6 +46,7 @@ import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreAttributes;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
@@ -83,6 +85,11 @@ public class MLTransactionMetadataStore
public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
private final long maxActiveTransactionsPerCoordinator;
+ private volatile TransactionMetadataStoreAttributes attributes = null;
+ private static final
AtomicReferenceFieldUpdater<MLTransactionMetadataStore,
TransactionMetadataStoreAttributes>
+ ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+ MLTransactionMetadataStore.class,
TransactionMetadataStoreAttributes.class, "attributes");
+
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
TransactionTimeoutTracker timeoutTracker,
@@ -549,4 +556,13 @@ public class MLTransactionMetadataStore
public ManagedLedger getManagedLedger() {
return this.transactionLog.getManagedLedger();
}
+
+ @Override
+ public TransactionMetadataStoreAttributes getAttributes() {
+ if (attributes != null) {
+ return attributes;
+ }
+ return ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
+ old -> old != null ? old : new
TransactionMetadataStoreAttributes(this));
+ }
}