tjiuming commented on code in PR #15140: URL: https://github.com/apache/pulsar/pull/15140#discussion_r902511460
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleStatsImpl.java: ########## @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import io.prometheus.client.Counter; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; +import org.apache.pulsar.common.naming.TopicName; + +public class PendingAckHandleStatsImpl implements PendingAckHandleStats { + private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false); + private static Counter commitTxnCounter; + private static Counter abortTxnCounter; + private static boolean exposeTopicLevelMetrics0; + + private final String[] labelSucceed; + private final String[] labelFailed; + + public PendingAckHandleStatsImpl(String topic, String subscription, boolean exposeTopicLevelMetrics) { + initialize(exposeTopicLevelMetrics); + + String namespace; + if (StringUtils.isBlank(topic)) { + namespace = topic = "unknown"; Review Comment: right, it's not possible that topic is blank, it just in case of unexpected behaviors. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientStatsImpl.java: ########## @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Summary; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferClientStats; +import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler; +import org.apache.pulsar.common.naming.TopicName; + +public final class TransactionBufferClientStatsImpl implements TransactionBufferClientStats { + private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 0.9999, 1}; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final Counter abortFailed; + private final Counter commitFailed; + private final Summary abortLatency; + private final Summary commitLatency; + private final Gauge pendingRequests; + + private final boolean exposeTopicLevelMetrics; + + private static TransactionBufferClientStats instance; + + private TransactionBufferClientStatsImpl(boolean exposeTopicLevelMetrics, + TransactionBufferHandler handler) { + this.exposeTopicLevelMetrics = exposeTopicLevelMetrics; + String[] labelNames = exposeTopicLevelMetrics + ? new String[]{"namespace", "topic"} : new String[]{"namespace"}; + + this.abortFailed = Counter.build("pulsar_txn_tb_client_abort_failed", "-") + .labelNames(labelNames) + .register(); Review Comment: Yes. When run 2 brokers inside a same JVM, it just bind 2 different ports, other behaviors makes no differences. Yes, many other metrics using the default registry. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java: ########## @@ -35,47 +37,88 @@ public class TransactionBufferClientImpl implements TransactionBufferClient { private final TransactionBufferHandler tbHandler; + private final TransactionBufferClientStats stats; - private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) { + private TransactionBufferClientImpl(TransactionBufferHandler tbHandler, boolean exposeTopicLevelMetrics, + boolean enableTxnCoordinator) { this.tbHandler = tbHandler; + this.stats = TransactionBufferClientStats.create(exposeTopicLevelMetrics, tbHandler, enableTxnCoordinator); } public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer, int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException { TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarService, timer, maxConcurrentRequests, operationTimeoutInMills); - return new TransactionBufferClientImpl(handler); + + ServiceConfiguration config = pulsarService.getConfig(); + boolean exposeTopicLevelMetrics = config.isExposeTopicLevelMetricsInPrometheus(); + boolean enableTxnCoordinator = config.isTransactionCoordinatorEnabled(); + return new TransactionBufferClientImpl(handler, exposeTopicLevelMetrics, enableTxnCoordinator); } @Override public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits, long lowWaterMark) { - return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, TxnAction.COMMIT, lowWaterMark); + long start = System.currentTimeMillis(); Review Comment: it makes sense, I‘ll change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
