nicoloboschi commented on code in PR #15140:
URL: https://github.com/apache/pulsar/pull/15140#discussion_r902569967


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientStats.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientStatsImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+
+public interface TransactionBufferClientStats {
+
+    void recordAbortFailed(String topic);
+
+    void recordCommitFailed(String topic);
+
+    void recordAbortLatency(String topic, long cost);
+
+    void recordCommitLatency(String topic, long cost);
+
+    void close();

Review Comment:
   we can use `AutoCloseable` 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -245,6 +248,20 @@ public CompletableFuture<Void> 
checkIfTBRecoverCompletely(boolean isTxnEnabled)
         }
     }
 
+    @Override
+    public long getOngoingTxnCount() {

Review Comment:
   IIUC `ongoingTxns` and `aborts` are real gauges because they can be 
increased or decreased. 
   `txnCommittedCounter` is only incremented and so it's a counter. 
   Is it correct ? 
   If so, I believe we have to introduce a txnAbortedCounter



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandleStats.java:
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import 
org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleStatsImpl;
+
+public interface PendingAckHandleStats {
+
+    void recordCommitTxn(boolean success);
+
+    void recordAbortTxn(boolean success);
+
+    void close();

Review Comment:
   Autocloseable? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.atomic.AtomicBoolean;
+import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class TransactionBufferClientStatsImpl implements 
TransactionBufferClientStats {
+    private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 
0.9999, 1};
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final Counter abortFailed;
+    private final Counter commitFailed;
+    private final Summary abortLatency;
+    private final Summary commitLatency;
+    private final Gauge pendingRequests;
+
+    private final boolean exposeTopicLevelMetrics;
+
+    private static TransactionBufferClientStats instance;
+
+    private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
+                                             TransactionBufferHandler handler) 
{
+        this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+        String[] labelNames = exposeTopicLevelMetrics
+                ? new String[]{"namespace", "topic"} : new 
String[]{"namespace"};
+
+        this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", 
"-")
+                .labelNames(labelNames)
+                .register();
+        this.commitFailed = 
Counter.build("pulsar_txn_tb_client_commit_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.abortLatency =
+                this.buildSummary("pulsar_txn_tb_client_abort_latency", "-", 
labelNames);
+        this.commitLatency =
+                this.buildSummary("pulsar_txn_tb_client_commit_latency", "-", 
labelNames);
+
+        this.pendingRequests = 
Gauge.build("pulsar_txn_tb_client_pending_requests", "-")
+                .register()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return null == handler ? 0 : 
handler.getPendingRequestsCount();
+                    }
+                });
+    }
+
+    private Summary buildSummary(String name, String help, String[] 
labelNames) {
+        Summary.Builder builder = Summary.build(name, help)
+                .labelNames(labelNames);
+        for (double quantile : QUANTILES) {
+            builder.quantile(quantile, 0.01D);
+        }
+        return builder.register();
+    }
+
+    public static synchronized TransactionBufferClientStats 
getInstance(boolean exposeTopicLevelMetrics,
+                                                                        
TransactionBufferHandler handler) {
+        if (null == instance) {
+            instance = new 
TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+        }
+
+        return instance;
+    }
+
+    @Override
+    public void recordAbortFailed(String topic) {
+        this.abortFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordCommitFailed(String topic) {
+        this.commitFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordAbortLatency(String topic, long cost) {
+        this.abortLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    @Override
+    public void recordCommitLatency(String topic, long cost) {
+        this.commitLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    private String[] labelValues(String topic) {
+        try {
+            TopicName topicName = TopicName.get(topic);
+            return exposeTopicLevelMetrics
+                    ? new String[]{topicName.getNamespace(), topic} : new 
String[]{topicName.getNamespace()};
+        } catch (Throwable t) {

Review Comment:
   log the exception ? 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.atomic.AtomicBoolean;
+import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.naming.TopicName;
+
+public final class TransactionBufferClientStatsImpl implements 
TransactionBufferClientStats {
+    private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 
0.9999, 1};
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final Counter abortFailed;
+    private final Counter commitFailed;
+    private final Summary abortLatency;
+    private final Summary commitLatency;
+    private final Gauge pendingRequests;
+
+    private final boolean exposeTopicLevelMetrics;
+
+    private static TransactionBufferClientStats instance;
+
+    private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics,
+                                             TransactionBufferHandler handler) 
{
+        this.exposeTopicLevelMetrics = exposeTopicLevelMetrics;
+        String[] labelNames = exposeTopicLevelMetrics
+                ? new String[]{"namespace", "topic"} : new 
String[]{"namespace"};
+
+        this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", 
"-")
+                .labelNames(labelNames)
+                .register();
+        this.commitFailed = 
Counter.build("pulsar_txn_tb_client_commit_failed", "-")
+                .labelNames(labelNames)
+                .register();
+        this.abortLatency =
+                this.buildSummary("pulsar_txn_tb_client_abort_latency", "-", 
labelNames);
+        this.commitLatency =
+                this.buildSummary("pulsar_txn_tb_client_commit_latency", "-", 
labelNames);
+
+        this.pendingRequests = 
Gauge.build("pulsar_txn_tb_client_pending_requests", "-")
+                .register()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return null == handler ? 0 : 
handler.getPendingRequestsCount();
+                    }
+                });
+    }
+
+    private Summary buildSummary(String name, String help, String[] 
labelNames) {
+        Summary.Builder builder = Summary.build(name, help)
+                .labelNames(labelNames);
+        for (double quantile : QUANTILES) {
+            builder.quantile(quantile, 0.01D);
+        }
+        return builder.register();
+    }
+
+    public static synchronized TransactionBufferClientStats 
getInstance(boolean exposeTopicLevelMetrics,
+                                                                        
TransactionBufferHandler handler) {
+        if (null == instance) {
+            instance = new 
TransactionBufferClientStatsImpl(exposeTopicLevelMetrics, handler);
+        }
+
+        return instance;
+    }
+
+    @Override
+    public void recordAbortFailed(String topic) {
+        this.abortFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordCommitFailed(String topic) {
+        this.commitFailed.labels(labelValues(topic)).inc();
+    }
+
+    @Override
+    public void recordAbortLatency(String topic, long cost) {
+        this.abortLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    @Override
+    public void recordCommitLatency(String topic, long cost) {
+        this.commitLatency.labels(labelValues(topic)).observe(cost);
+    }
+
+    private String[] labelValues(String topic) {
+        try {
+            TopicName topicName = TopicName.get(topic);
+            return exposeTopicLevelMetrics
+                    ? new String[]{topicName.getNamespace(), topic} : new 
String[]{topicName.getNamespace()};
+        } catch (Throwable t) {
+            return exposeTopicLevelMetrics ? new String[]{"unknown", 
"unknown"} : new String[]{"unknown"};
+        }
+    }
+
+    @Override
+    public void close() {
+        if (instance == this && this.closed.compareAndSet(false, true)) {

Review Comment:
   why `instance == this` is needed ? if I call close() on this instance I 
expect to call this object even if it's not the singleton



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to