This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ebfef6de5e2 [optimize][admin]Enhancing Transaction Buffer Stats and
Introducing TransactionBufferInternalStats API (#20330)
ebfef6de5e2 is described below
commit ebfef6de5e2026aa2226a4776dba07df0897df28
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Jul 22 19:06:04 2023 +0800
[optimize][admin]Enhancing Transaction Buffer Stats and Introducing
TransactionBufferInternalStats API (#20330)
master https://github.com/apache/pulsar/issues/20291
### Motivation
Our primary goal is to improve the visibility and troubleshooting
capabilities of the Pulsar Transaction Buffer by providing more detailed
information about the snapshot stats and system topic internal status.
### Modifications
1. Enhance the existing TransactionBufferStats by adding information about
snapshot stats, including the capital of the current segment, unseal aborted
transaction ID size, and other related data. This will provide better
visibility and troubleshooting capabilities for the Pulsar Transaction Buffer.
2. Introduce a new API for obtaining TransactionBufferInternalStats,
allowing users to access the state of the system topic used for storing
snapshots. This will facilitate problem investigation and resolution when
issues arise with the transaction buffer.
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 69 ++++++++++++-
.../pulsar/broker/admin/v3/Transactions.java | 68 ++++++++++---
.../broker/service/persistent/PersistentTopic.java | 6 +-
.../transaction/buffer/AbortedTxnProcessor.java | 11 ++-
.../transaction/buffer/TransactionBuffer.java | 17 ++++
.../buffer/impl/InMemTransactionBuffer.java | 14 ++-
.../SingleSnapshotAbortedTxnProcessorImpl.java | 8 +-
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 28 +++++-
.../buffer/impl/TopicTransactionBuffer.java | 21 +++-
.../buffer/impl/TransactionBufferDisable.java | 11 +++
.../v3/AdminApiTransactionMultiBrokerTest.java | 46 +++++++++
.../broker/admin/v3/AdminApiTransactionTest.java | 107 ++++++++++++++++++++-
.../SegmentAbortedTxnProcessorTest.java | 88 ++++++++++++++++-
.../pulsar/broker/transaction/TransactionTest.java | 4 +-
.../broker/transaction/TransactionTestBase.java | 3 +-
.../buffer/TransactionStablePositionTest.java | 4 +-
.../apache/pulsar/client/admin/Transactions.java | 63 ++++++++++--
...ansactionBufferStats.java => SegmentStats.java} | 33 ++-----
...nsactionBufferStats.java => SegmentsStats.java} | 34 +++----
....java => SnapshotSystemTopicInternalStats.java} | 30 +-----
...ts.java => TransactionBufferInternalStats.java} | 32 ++----
.../policies/data/TransactionBufferStats.java | 9 ++
.../client/admin/internal/TransactionsImpl.java | 26 ++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 6 +-
.../apache/pulsar/admin/cli/CmdTransactions.java | 22 ++++-
25 files changed, 619 insertions(+), 141 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index d596cbdd39d..3921334cff3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
@@ -38,6 +39,7 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Transactions;
@@ -47,6 +49,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.SnapshotSystemTopicInternalStats;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -170,9 +174,10 @@ public abstract class TransactionsBase extends
AdminResource {
}
protected CompletableFuture<TransactionBufferStats>
internalGetTransactionBufferStats(boolean authoritative,
-
boolean lowWaterMarks) {
+
boolean lowWaterMarks,
+
boolean segmentStats) {
return getExistingPersistentTopicAsync(authoritative)
- .thenApply(topic ->
topic.getTransactionBufferStats(lowWaterMarks));
+ .thenApply(topic ->
topic.getTransactionBufferStats(lowWaterMarks, segmentStats));
}
protected CompletableFuture<TransactionPendingAckStats>
internalGetPendingAckStats(
@@ -431,6 +436,66 @@ public abstract class TransactionsBase extends
AdminResource {
);
}
+ protected CompletableFuture<TransactionBufferInternalStats>
internalGetTransactionBufferInternalStats(
+ boolean authoritative, boolean metadata) {
+ TransactionBufferInternalStats transactionBufferInternalStats = new
TransactionBufferInternalStats();
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenCompose(topic -> {
+ AbortedTxnProcessor.SnapshotType snapshotType =
topic.getTransactionBuffer().getSnapshotType();
+ if (snapshotType == null) {
+ return FutureUtil.failedFuture(new
RestException(NOT_FOUND,
+ "Transaction buffer Snapshot for the topic
does not exist"));
+ } else if (snapshotType ==
AbortedTxnProcessor.SnapshotType.Segment) {
+ transactionBufferInternalStats.snapshotType =
snapshotType.toString();
+ TopicName segmentTopic =
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ CompletableFuture<SnapshotSystemTopicInternalStats>
segmentInternalStatsFuture =
+ getTxnSnapshotInternalStats(segmentTopic,
metadata);
+ TopicName indexTopic =
TopicName.get(TopicDomain.persistent.toString(),
+ namespaceName,
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+ CompletableFuture<SnapshotSystemTopicInternalStats>
segmentIndexInternalStatsFuture =
+ getTxnSnapshotInternalStats(indexTopic,
metadata);
+ return segmentIndexInternalStatsFuture
+ .thenCombine(segmentInternalStatsFuture,
(indexStats, segmentStats) -> {
+
transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
+
transactionBufferInternalStats.segmentInternalStats = segmentStats;
+ return transactionBufferInternalStats;
+ });
+ } else if (snapshotType ==
AbortedTxnProcessor.SnapshotType.Single) {
+ transactionBufferInternalStats.snapshotType =
snapshotType.toString();
+ TopicName singleSnapshotTopic =
TopicName.get(TopicDomain.persistent.toString(), namespaceName,
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+ return
getTxnSnapshotInternalStats(singleSnapshotTopic, metadata)
+ .thenApply(snapshotSystemTopicInternalStats ->
{
+
transactionBufferInternalStats.singleSnapshotSystemTopicInternalStats =
+ snapshotSystemTopicInternalStats;
+ return transactionBufferInternalStats;
+ });
+ }
+ return FutureUtil.failedFuture(new
RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType "
+ + snapshotType));
+ });
+ }
+
+ private CompletableFuture<SnapshotSystemTopicInternalStats>
getTxnSnapshotInternalStats(TopicName topicName,
+
boolean metadata) {
+ final PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ return FutureUtil.failedFuture(new RestException(e));
+ }
+ return admin.topics().getInternalStatsAsync(topicName.toString(),
metadata)
+ .thenApply(persistentTopicInternalStats -> {
+ SnapshotSystemTopicInternalStats
+ snapshotSystemTopicInternalStats = new
SnapshotSystemTopicInternalStats();
+
snapshotSystemTopicInternalStats.managedLedgerInternalStats =
persistentTopicInternalStats;
+ snapshotSystemTopicInternalStats.managedLedgerName
= topicName.getEncodedLocalName();
+ return snapshotSystemTopicInternalStats;
+ });
+ }
+
protected CompletableFuture<PersistentTopic>
getExistingPersistentTopicAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName,
authoritative).thenCompose(__ -> {
CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index aa24dbdcc3a..1bdc2255085 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.admin.impl.TransactionsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.util.FutureUtil;
+import org.jetbrains.annotations.Nullable;
@Path("/transactions")
@Produces(MediaType.APPLICATION_JSON)
@@ -171,11 +172,13 @@ public class Transactions extends TransactionsBase {
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic,
@QueryParam("lowWaterMarks")
@DefaultValue("false")
- boolean lowWaterMarks) {
+ boolean lowWaterMarks,
+ @QueryParam("segmentStats")
@DefaultValue("false")
+ boolean segmentStats) {
try {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
- internalGetTransactionBufferStats(authoritative, lowWaterMarks)
+ internalGetTransactionBufferStats(authoritative, lowWaterMarks,
segmentStats)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
@@ -315,17 +318,58 @@ public class Transactions extends TransactionsBase {
log.error("[{}] Failed to get pending ack internal
stats {}",
clientAppId(), topicName, ex);
}
- Throwable cause =
FutureUtil.unwrapCompletionException(ex);
- if (cause instanceof
BrokerServiceException.ServiceUnitNotReadyException) {
- asyncResponse.resume(new
RestException(SERVICE_UNAVAILABLE, cause));
- } else if (cause instanceof
BrokerServiceException.NotAllowedException) {
- asyncResponse.resume(new
RestException(METHOD_NOT_ALLOWED, cause));
- } else if (cause instanceof
BrokerServiceException.SubscriptionNotFoundException) {
- asyncResponse.resume(new RestException(NOT_FOUND,
cause));
- } else {
- asyncResponse.resume(new RestException(cause));
+ return
resumeAsyncResponseWithBrokerException(asyncResponse, ex);
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ }
+
+ @Nullable
+ private Void resumeAsyncResponseWithBrokerException(@Suspended
AsyncResponse asyncResponse,
+ Throwable ex) {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
BrokerServiceException.ServiceUnitNotReadyException) {
+ asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
cause));
+ } else if (cause instanceof
BrokerServiceException.NotAllowedException) {
+ asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, cause));
+ } else if (cause instanceof
BrokerServiceException.SubscriptionNotFoundException) {
+ asyncResponse.resume(new RestException(NOT_FOUND, cause));
+ } else {
+ asyncResponse.resume(new RestException(cause));
+ }
+ return null;
+ }
+
+ @GET
+ @Path("/transactionBufferInternalStats/{tenant}/{namespace}/{topic}")
+ @ApiOperation(value = "Get transaction buffer internal stats.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),
+ @ApiResponse(code = 503, message = "This Broker is not enable
transaction"),
+ @ApiResponse(code = 307, message = "Topic is not owned by this
broker!"),
+ @ApiResponse(code = 405, message = "Transaction buffer don't use
managedLedger!"),
+ @ApiResponse(code = 400, message = "Topic is not a persistent
topic!"),
+ @ApiResponse(code = 409, message = "Concurrent modification")
+ })
+ public void getTransactionBufferInternalStats(@Suspended final
AsyncResponse asyncResponse,
+ @QueryParam("authoritative")
+ @DefaultValue("false")
boolean authoritative,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace")
String namespace,
+ @PathParam("topic") @Encoded
String encodedTopic,
+ @QueryParam("metadata")
@DefaultValue("false") boolean metadata) {
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetTransactionBufferInternalStats(authoritative, metadata)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction buffer
internal stats {}",
+ clientAppId(), topicName, ex);
}
- return null;
+ return
resumeAsyncResponseWithBrokerException(asyncResponse, ex);
});
} catch (Exception ex) {
resumeAsyncResponseExceptionally(asyncResponse, ex);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1e055eccc42..e907caa8c30 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3566,7 +3566,11 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
public TransactionBufferStats getTransactionBufferStats(boolean
lowWaterMarks) {
- return this.transactionBuffer.getStats(lowWaterMarks);
+ return getTransactionBufferStats(lowWaterMarks, false);
+ }
+
+ public TransactionBufferStats getTransactionBufferStats(boolean
lowWaterMarks, boolean segmentStats) {
+ return this.transactionBuffer.getStats(lowWaterMarks, segmentStats);
}
public TransactionPendingAckStats getTransactionPendingAckStats(String
subName, boolean lowWaterMarks) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
index 8223aa12b75..0f06c201a81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/AbortedTxnProcessor.java
@@ -21,10 +21,16 @@ package org.apache.pulsar.broker.transaction.buffer;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
public interface AbortedTxnProcessor {
+ enum SnapshotType {
+ Single,
+ Segment,
+ }
+
/**
* After the transaction buffer writes a transaction aborted marker to the
topic,
* the transaction buffer will put the aborted txnID and the aborted
marker position to AbortedTxnProcessor.
@@ -66,9 +72,10 @@ public interface AbortedTxnProcessor {
/**
* Get the lastSnapshotTimestamps.
- * @return the lastSnapshotTimestamps.
+ *
+ * @return a transactionBufferStats with the stats in the
abortedTxnProcessor.
*/
- long getLastSnapshotTimestamps();
+ TransactionBufferStats generateSnapshotStats(boolean segmentStats);
CompletableFuture<Void> closeAsync();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 9c32f762996..7eb5d6f789c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -158,12 +158,29 @@ public interface TransactionBuffer {
*/
PositionImpl getMaxReadPosition();
+ /**
+ * Get the snapshot type.
+ *
+ * The snapshot type can be either "Single" or "Segment". In "Single"
mode, a single snapshot log is used
+ * to record the transaction buffer stats. In "Segment" mode, a snapshot
segment topic is used to record
+ * the stats, and a separate snapshot segment index topic is used to index
these stats.
+ *
+ * @return the snapshot type
+ */
+ AbortedTxnProcessor.SnapshotType getSnapshotType();
+
/**
* Get transaction in buffer stats.
* @return the transaction in buffer stats.
*/
TransactionInBufferStats getTransactionInBufferStats(TxnID txnID);
+ /**
+ * Get transaction stats in buffer.
+ * @return the transaction stats in buffer.
+ */
+ TransactionBufferStats getStats(boolean lowWaterMarks, boolean
segmentStats);
+
/**
* Get transaction stats in buffer.
* @return the transaction stats in buffer.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 56b49f98efe..bc2dd58a581 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
@@ -374,16 +375,27 @@ class InMemTransactionBuffer implements TransactionBuffer
{
return PositionImpl.LATEST;
}
+ @Override
+ public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+ return null;
+ }
+
@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return null;
}
@Override
- public TransactionBufferStats getStats(boolean lowWaterMarks) {
+ public TransactionBufferStats getStats(boolean lowWaterMarks, boolean
segmentStats) {
return null;
}
+ @Override
+ public TransactionBufferStats getStats(boolean lowWaterMarks) {
+ return getStats(lowWaterMarks, false);
+ }
+
+
@Override
public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 5d582d564ea..967f1f16fef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
@@ -173,8 +174,11 @@ public class SingleSnapshotAbortedTxnProcessorImpl
implements AbortedTxnProcesso
}
@Override
- public long getLastSnapshotTimestamps() {
- return this.lastSnapshotTimestamps;
+ public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
+ TransactionBufferStats transactionBufferStats = new
TransactionBufferStats();
+ transactionBufferStats.lastSnapshotTimestamps =
this.lastSnapshotTimestamps;
+ transactionBufferStats.totalAbortedTransactions = aborts.size();
+ return transactionBufferStats;
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index cd6c9c6123a..be1271a155c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -62,6 +62,9 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.SegmentStats;
+import org.apache.pulsar.common.policies.data.SegmentsStats;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
@@ -116,6 +119,8 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
private volatile long lastSnapshotTimestamps;
+ private volatile long lastTakedSnapshotSegmentTimestamp;
+
/**
* The number of the aborted transaction IDs in a segment.
* This is calculated according to the configured memory size.
@@ -451,9 +456,25 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
persistentWorker::clearSnapshotSegmentAndIndexes);
}
- @Override
- public long getLastSnapshotTimestamps() {
- return this.lastSnapshotTimestamps;
+ public TransactionBufferStats generateSnapshotStats(boolean segmentStats) {
+ TransactionBufferStats transactionBufferStats = new
TransactionBufferStats();
+ transactionBufferStats.totalAbortedTransactions = this.aborts.size();
+ transactionBufferStats.lastSnapshotTimestamps =
this.lastSnapshotTimestamps;
+ SegmentsStats segmentsStats = new SegmentsStats();
+ segmentsStats.currentSegmentCapacity = this.snapshotSegmentCapacity;
+ segmentsStats.lastTookSnapshotSegmentTimestamp =
this.lastTakedSnapshotSegmentTimestamp;
+ segmentsStats.unsealedAbortTxnIDSize = this.unsealedTxnIds.size();
+ segmentsStats.segmentsSize = indexes.size();
+ if (segmentStats) {
+ List<SegmentStats> statsList = new ArrayList<>();
+ segmentIndex.forEach((position, txnID) -> {
+ SegmentStats stats = new SegmentStats(txnID.toString(),
position.toString());
+ statsList.add(stats);
+ });
+ segmentsStats.segmentStats = statsList;
+ }
+ transactionBufferStats.segmentsStats = segmentsStats;
+ return transactionBufferStats;
}
@Override
@@ -705,6 +726,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get());
return
segmentWriter.writeAsync(buildKey(this.sequenceID.get()),
transactionBufferSnapshotSegment);
}).thenCompose((messageId) -> {
+ lastTakedSnapshotSegmentTimestamp = System.currentTimeMillis();
//Build index for this segment
TransactionBufferSnapshotIndex index = new
TransactionBufferSnapshotIndex();
index.setSequenceID(transactionBufferSnapshotSegment.getSequenceId());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 89a8e95afba..3c13be22086 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -101,6 +101,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final AbortedTxnProcessor snapshotAbortedTxnProcessor;
+ private final AbortedTxnProcessor.SnapshotType snapshotType;
+
public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
this.topic = topic;
@@ -112,8 +114,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
this.maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
if
(topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled())
{
snapshotAbortedTxnProcessor = new
SnapshotSegmentAbortedTxnProcessorImpl(topic);
+ snapshotType = AbortedTxnProcessor.SnapshotType.Segment;
} else {
snapshotAbortedTxnProcessor = new
SingleSnapshotAbortedTxnProcessorImpl(topic);
+ snapshotType = AbortedTxnProcessor.SnapshotType.Single;
}
this.recover();
}
@@ -489,6 +493,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
+ @Override
+ public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+ return snapshotType;
+ }
+
@Override
public PositionImpl getMaxReadPosition() {
if (checkIfReady() || checkIfNoSnapshot()) {
@@ -509,9 +518,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
@Override
- public TransactionBufferStats getStats(boolean lowWaterMarks) {
- TransactionBufferStats transactionBufferStats = new
TransactionBufferStats();
- transactionBufferStats.lastSnapshotTimestamps =
this.snapshotAbortedTxnProcessor.getLastSnapshotTimestamps();
+ public TransactionBufferStats getStats(boolean lowWaterMarks, boolean
segmentStats) {
+ TransactionBufferStats transactionBufferStats =
this.snapshotAbortedTxnProcessor
+ .generateSnapshotStats(segmentStats);
+ transactionBufferStats.snapshotType = snapshotType.toString();
transactionBufferStats.state = this.getState().name();
transactionBufferStats.maxReadPosition =
this.maxReadPosition.toString();
if (lowWaterMarks) {
@@ -524,6 +534,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
return transactionBufferStats;
}
+ @Override
+ public TransactionBufferStats getStats(boolean lowWaterMarks) {
+ return getStats(lowWaterMarks, false);
+ }
+
@Override
public void run(Timeout timeout) {
if (checkIfReady()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 22ba8e2d2e8..7c74b52951e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
@@ -93,11 +94,21 @@ public class TransactionBufferDisable implements
TransactionBuffer {
return PositionImpl.LATEST;
}
+ @Override
+ public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+ return null;
+ }
+
@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return null;
}
+ @Override
+ public TransactionBufferStats getStats(boolean lowWaterMarks, boolean
segmentStats) {
+ return null;
+ }
+
@Override
public TransactionBufferStats getStats(boolean lowWaterMarks) {
return null;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
index 52aadde7b26..bf51c69fbae 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionMultiBrokerTest.java
@@ -19,13 +19,22 @@
package org.apache.pulsar.broker.admin.v3;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -81,4 +90,41 @@ public class AdminApiTransactionMultiBrokerTest extends
TransactionTestBase {
localAdmin.transactions().getCoordinatorInternalStats(i, false);
}
}
+
+ @Test
+ public void testGetTransactionBufferInternalStatsInMultiBroker() throws
Exception {
+ for (int i = 0; i < super.getBrokerCount(); i++) {
+
getPulsarServiceList().get(i).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+ }
+ String topic1 = NAMESPACE1 +
"/testGetTransactionBufferInternalStatsInMultiBroker";
+ assertTrue(admin.namespaces().getBundles(NAMESPACE1).getNumBundles() >
1);
+ for (int i = 0; true ; i++) {
+ topic1 = topic1 + i;
+ admin.topics().createNonPartitionedTopic(topic1);
+ String segmentTopicBroker = admin.lookups()
+ .lookupTopic(NAMESPACE1 + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ String indexTopicBroker = admin.lookups()
+ .lookupTopic(NAMESPACE1 + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+ if (segmentTopicBroker.equals(indexTopicBroker)) {
+ String topicBroker = admin.lookups().lookupTopic(topic1);
+ if (!topicBroker.equals(segmentTopicBroker)) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer(Schema.BYTES).topic(topic1).create();
+ TransactionBufferInternalStats stats = admin.transactions()
+ .getTransactionBufferInternalStatsAsync(topic1, true).get();
+ assertEquals(stats.snapshotType,
AbortedTxnProcessor.SnapshotType.Segment.toString());
+ assertNull(stats.singleSnapshotSystemTopicInternalStats);
+ assertNotNull(stats.segmentInternalStats);
+ assertTrue(stats.segmentInternalStats.managedLedgerName
+
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
+ assertNotNull(stats.segmentIndexInternalStats);
+ assertTrue(stats.segmentIndexInternalStats.managedLedgerName
+
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 0e51470da75..1e5f4679492 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -60,7 +61,9 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -510,7 +513,7 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
TransactionCoordinatorInternalStats stats = admin.transactions()
.getCoordinatorInternalStatsAsync(0, true).get();
-
verifyManagedLegerInternalStats(stats.transactionLogStats.managedLedgerInternalStats,
26);
+
verifyManagedLedgerInternalStats(stats.transactionLogStats.managedLedgerInternalStats,
26);
assertEquals(TopicName.get(TopicDomain.persistent.toString(),
NamespaceName.SYSTEM_NAMESPACE,
MLTransactionLogImpl.TRANSACTION_LOG_PREFIX +
"0").getPersistenceNamingEncoding(),
stats.transactionLogStats.managedLedgerName);
@@ -565,7 +568,7 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
+ subName +
SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
stats.pendingAckLogStats.managedLedgerName);
- verifyManagedLegerInternalStats(managedLedgerInternalStats, 16);
+ verifyManagedLedgerInternalStats(managedLedgerInternalStats, 16);
ManagedLedgerInternalStats finalManagedLedgerInternalStats =
managedLedgerInternalStats;
managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> {
@@ -584,6 +587,88 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
}
+ @Test(timeOut = 20000)
+ public void testGetTransactionBufferInternalStats() throws Exception {
+ // Initialize transaction
+ initTransaction(1);
+
+ // Create topics
+ final String topic1 =
"persistent://public/default/testGetTransactionBufferInternalStats-1";
+ final String topic2 =
"persistent://public/default/testGetTransactionBufferInternalStats-2";
+ final String topic3 =
"persistent://public/default/testGetTransactionBufferInternalStats-3";
+ pulsar.getConfig().setTransactionCoordinatorEnabled(false);
+ admin.topics().createNonPartitionedTopic(topic1);
+
+ // Verify NotFoundException when transaction coordinator is disabled
+ try {
+
admin.transactions().getTransactionBufferInternalStatsAsync(topic1, true).get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof
PulsarAdminException.NotFoundException);
+ }
+
+ // Enable transaction coordinator and disable segmented snapshot
+ pulsar.getConfig().setTransactionCoordinatorEnabled(true);
+ pulsar.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Send a message with a transaction and abort it
+ Producer<byte[]> producer =
pulsarClient.newProducer(Schema.BYTES).topic(topic2).create();
+ TransactionImpl transaction = (TransactionImpl) getTransaction();
+ producer.newMessage(transaction).send();
+ transaction.abort().get();
+
+ // Get transaction buffer internal stats and verify single snapshot
stats
+ TransactionBufferInternalStats stats = admin.transactions()
+ .getTransactionBufferInternalStatsAsync(topic2, true).get();
+ assertEquals(stats.snapshotType,
AbortedTxnProcessor.SnapshotType.Single.toString());
+ assertNotNull(stats.singleSnapshotSystemTopicInternalStats);
+
+ // Get managed ledger internal stats for the transaction buffer
snapshot topic
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(
+ TopicName.get(topic2).getNamespace() + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+
verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats,
+ internalStats);
+
assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName
+ .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
+ assertNull(stats.segmentInternalStats);
+ assertNull(stats.segmentIndexInternalStats);
+
+ // Configure segmented snapshot and set segment size
+ pulsar.getConfig().setTransactionBufferSnapshotSegmentSize(9);
+ pulsar.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
+ // Send a message with a transaction and abort it
+ producer =
pulsarClient.newProducer(Schema.BYTES).topic(topic3).create();
+ transaction = (TransactionImpl) getTransaction();
+ producer.newMessage(transaction).send();
+ transaction.abort().get();
+
+ // Get transaction buffer internal stats and verify segmented snapshot
stats
+ stats =
admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
+ assertEquals(stats.snapshotType,
AbortedTxnProcessor.SnapshotType.Segment.toString());
+ assertNull(stats.singleSnapshotSystemTopicInternalStats);
+ assertNotNull(stats.segmentInternalStats);
+
+ // Get managed ledger internal stats for the transaction buffer
segments topic
+ internalStats = admin.topics().getInternalStats(
+ TopicName.get(topic2).getNamespace() + "/" +
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats,
internalStats);
+ assertTrue(stats.segmentInternalStats.managedLedgerName
+
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
+
+ // Get managed ledger internal stats for the transaction buffer
indexes topic
+ assertNotNull(stats.segmentIndexInternalStats);
+ internalStats = admin.topics().getInternalStats(
+ TopicName.get(topic2).getNamespace() + "/" +
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats,
internalStats);
+ assertTrue(stats.segmentIndexInternalStats.managedLedgerName
+
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+ }
+
+
+
@Test(timeOut = 20000)
public void testTransactionNotEnabled() throws Exception {
cleanup();
@@ -836,7 +921,7 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
}
- private static void
verifyManagedLegerInternalStats(ManagedLedgerInternalStats
managedLedgerInternalStats,
+ private static void
verifyManagedLedgerInternalStats(ManagedLedgerInternalStats
managedLedgerInternalStats,
long totalSize) {
assertEquals(managedLedgerInternalStats.entriesAddedCounter, 1);
assertEquals(managedLedgerInternalStats.numberOfEntries, 1);
@@ -851,4 +936,20 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
assertNotNull(managedLedgerInternalStats.ledgers.get(0).metadata);
assertEquals(managedLedgerInternalStats.cursors.size(), 1);
}
+
+ private static void
verifyManagedLedgerInternalStats(ManagedLedgerInternalStats internalStats,
+
ManagedLedgerInternalStats persistentTopicStats) {
+ assertEquals(persistentTopicStats.entriesAddedCounter,
internalStats.entriesAddedCounter);
+ assertEquals(persistentTopicStats.numberOfEntries,
internalStats.numberOfEntries);
+ assertEquals(persistentTopicStats.totalSize, internalStats.totalSize);
+ assertEquals(persistentTopicStats.currentLedgerEntries,
internalStats.currentLedgerEntries);
+ assertEquals(persistentTopicStats.currentLedgerSize,
internalStats.currentLedgerSize);
+ assertEquals(persistentTopicStats.lastLedgerCreationFailureTimestamp,
internalStats.lastLedgerCreationFailureTimestamp);
+ assertEquals(persistentTopicStats.waitingCursorsCount,
internalStats.waitingCursorsCount);
+ assertEquals(persistentTopicStats.pendingAddEntriesCount,
internalStats.pendingAddEntriesCount);
+ assertEquals(persistentTopicStats.lastConfirmedEntry,
internalStats.lastConfirmedEntry);
+ assertNotNull(internalStats.ledgers.get(0).metadata);
+ assertEquals(persistentTopicStats.ledgers.size(),
internalStats.ledgers.size());
+ assertEquals(persistentTopicStats.cursors.size(),
internalStats.cursors.size());
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index 9e1c1992477..6e763cb44fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.broker.transaction;
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.NavigableMap;
@@ -29,6 +31,7 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -47,8 +50,9 @@ import
org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxn
import
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -56,6 +60,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
@@ -256,6 +261,85 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
processor.closeAsync().get(5, TimeUnit.SECONDS);
}
+ @Test
+ public void testTxnSegmentStats() throws Exception {
+ // Set up test environment
+ String namespace = TENANT + "/testTxnSegmentStats";
+ String topic = "persistent://" + namespace + "/testTxnSegmentStats";
+
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 +
topic.length() + SEGMENT_SIZE * 3);
+
+ // Create necessary resources
+ Transactions transactions = admin.transactions();
+ admin.namespaces().createNamespace(namespace);
+ admin.topics().createNonPartitionedTopic(topic);
+
+ // Prepare topic, producer, and consumer
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ @Cleanup
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("my-sub").subscribe();
+
+ // Record the start time of the test
+ long testStartTime = System.currentTimeMillis();
+
+ Transaction transaction = null;
+ // Send messages with transactions and abort them
+ for (int i = 0; i < SEGMENT_SIZE; i++) {
+ transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS).build().get();
+ producer.newMessage(transaction).send();
+ transaction.abort().get();
+ }
+
+ Transaction txn =
pulsarClient.newTransaction().withTransactionTimeout(5,
TimeUnit.HOURS).build().get();
+ producer.newMessage(txn).send();
+ txn.abort().get();
+
+ // Get the transaction buffer stats without segment stats
+ TransactionBufferStats statsWithoutSegmentStats = transactions
+ .getTransactionBufferStats(topic, false, false);
+ assertNotNull(statsWithoutSegmentStats);
+ assertNotNull(statsWithoutSegmentStats.segmentsStats);
+ assertNull(statsWithoutSegmentStats.segmentsStats.segmentStats);
+ assertEquals(statsWithoutSegmentStats.snapshotType,
AbortedTxnProcessor.SnapshotType.Segment.toString());
+
+ // Verify the segment stats
+ assertEquals(statsWithoutSegmentStats.segmentsStats.segmentsSize, 1L);
+
assertEquals(statsWithoutSegmentStats.segmentsStats.unsealedAbortTxnIDSize, 1L);
+
assertEquals(statsWithoutSegmentStats.segmentsStats.currentSegmentCapacity,
SEGMENT_SIZE);
+ assertEquals(statsWithoutSegmentStats.totalAbortedTransactions,
SEGMENT_SIZE + 1);
+
assertTrue(statsWithoutSegmentStats.segmentsStats.lastTookSnapshotSegmentTimestamp
>= testStartTime);
+
+ // Get the transaction buffer stats with segment stats
+ TransactionBufferStats statsWithSegmentStats = transactions
+ .getTransactionBufferStats(topic, false, true);
+ assertNotNull(statsWithSegmentStats);
+ assertNotNull(statsWithSegmentStats.segmentsStats.segmentStats);
+
+ // Verify if the segment stats are present when requested
+ assertEquals(statsWithSegmentStats.segmentsStats.segmentStats.size(),
1);
+
assertEquals(statsWithSegmentStats.segmentsStats.segmentStats.get(0).lastTxnID,
+ transaction.getTxnID().toString());
+
+ // Verify multiple segments
+ for (int i = 0; i < SEGMENT_SIZE * 3; i++) {
+ transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS).build().get();
+ producer.newMessage(transaction).send();
+ transaction.abort().get();
+ }
+ statsWithSegmentStats = transactions.getTransactionBufferStats(topic,
false, true);
+
+ // Verify the segment stats
+ assertEquals(statsWithSegmentStats.segmentsStats.segmentsSize, 4L);
+
assertEquals(statsWithSegmentStats.segmentsStats.unsealedAbortTxnIDSize, 1L);
+ assertEquals(statsWithSegmentStats.totalAbortedTransactions,
SEGMENT_SIZE * 4 + 1);
+
+ // Reset the configuration
+ this.pulsarService.getConfig()
+ .setTransactionBufferSnapshotSegmentSize(8 +
PROCESSOR_TOPIC.length() + SEGMENT_SIZE * 3);
+ }
+
private void verifySnapshotSegmentsSize(String topic, int size) throws
Exception {
SystemTopicClient.Reader<TransactionBufferSnapshotSegment> reader =
pulsarService.getTransactionBufferSnapshotServiceFactory()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 4e50401fc11..cf389824794 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -743,7 +743,7 @@ public class TransactionTest extends TransactionTestBase {
TransactionBuffer buffer2 = new
TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
- assertEquals(buffer2.getStats(false).state, "Ready"));
+ assertEquals(buffer2.getStats(false, false).state, "Ready"));
managedCursors.removeCursor("transaction-buffer-sub");
doAnswer(invocation -> {
@@ -755,7 +755,7 @@ public class TransactionTest extends TransactionTestBase {
managedCursors.add(managedCursor,
managedCursor.getMarkDeletedPosition());
TransactionBuffer buffer3 = new
TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
- assertEquals(buffer3.getStats(false).state, "Ready"));
+ assertEquals(buffer3.getStats(false, false).state, "Ready"));
persistentTopic.getInternalStats(false).thenAccept(internalStats -> {
assertTrue(internalStats.cursors.isEmpty());
});
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index cd0c089ad41..c0300c63b35 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -58,6 +58,7 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
public static final String CLUSTER_NAME = "test";
@Setter
+ @Getter
private int brokerCount = 3;
@Getter
private final List<ServiceConfiguration> serviceConfigurationList = new
ArrayList<>();
@@ -116,7 +117,7 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
createTransactionCoordinatorAssign(numPartitionsOfTC);
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
- admin.namespaces().createNamespace(NAMESPACE1);
+ admin.namespaces().createNamespace(NAMESPACE1, 4);
if (topic != null) {
if (numPartitions == 0) {
admin.topics().createNonPartitionedTopic(topic);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index 55d115905a3..7493b25ac1d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -235,10 +235,10 @@ public class TransactionStablePositionTest extends
TransactionTestBase {
Awaitility.await().until(() -> {
if (clientEnableTransaction) {
// recover success, client enable transaction will change to
Ready State
- return
topicTransactionBuffer.getStats(false).state.equals(Ready.name());
+ return topicTransactionBuffer.getStats(false,
false).state.equals(Ready.name());
} else {
// recover success, client disable transaction will change to
NoSnapshot State
- return
topicTransactionBuffer.getStats(false).state.equals(NoSnapshot.name());
+ return topicTransactionBuffer.getStats(false,
false).state.equals(NoSnapshot.name());
}
});
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 57adf263a57..b0504bee744 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -139,10 +140,24 @@ public interface Transactions {
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
- * @param lowWaterMarks Whether to get information about lowWaterMarks
stored in transaction pending ack.
+ * @param lowWaterMarks Whether to get information about lowWaterMarks
stored in transaction pending ack.
+ * @param segmentStats Whether to get segment statistics.
* @return the future stats of transaction buffer in topic.
*/
- CompletableFuture<TransactionBufferStats>
getTransactionBufferStatsAsync(String topic, boolean lowWaterMarks);
+ CompletableFuture<TransactionBufferStats>
getTransactionBufferStatsAsync(String topic, boolean lowWaterMarks,
+
boolean segmentStats);
+
+ /**
+ * Get transaction buffer stats.
+ *
+ * @param topic the topic of getting transaction buffer stats
+ * @param lowWaterMarks Whether to get information about lowWaterMarks
stored in transaction pending ack.
+ * @return the future stats of transaction buffer in topic.
+ */
+ default CompletableFuture<TransactionBufferStats>
getTransactionBufferStatsAsync(String topic,
+
boolean lowWaterMarks) {
+ return getTransactionBufferStatsAsync(topic, lowWaterMarks, false);
+ }
/**
* Get transaction buffer stats.
@@ -151,17 +166,31 @@ public interface Transactions {
* @return the future stats of transaction buffer in topic.
*/
default CompletableFuture<TransactionBufferStats>
getTransactionBufferStatsAsync(String topic) {
- return getTransactionBufferStatsAsync(topic, false);
+ return getTransactionBufferStatsAsync(topic, false, false);
}
/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
- * @param lowWaterMarks Whether to get information about lowWaterMarks
stored in transaction buffer.
+ * @param lowWaterMarks Whether to get information about lowWaterMarks
stored in transaction buffer.
+ * @param segmentStats Whether to get segment statistics.
* @return the stats of transaction buffer in topic.
*/
- TransactionBufferStats getTransactionBufferStats(String topic, boolean
lowWaterMarks) throws PulsarAdminException;
+ TransactionBufferStats getTransactionBufferStats(String topic, boolean
lowWaterMarks,
+ boolean segmentStats)
throws PulsarAdminException;
+
+ /**
+ * Get transaction buffer stats.
+ *
+ * @param topic the topic of getting transaction buffer stats
+ * @param lowWaterMarks Whether to get information about lowWaterMarks
stored in transaction buffer.
+ * @return the stats of transaction buffer in topic.
+ */
+ default TransactionBufferStats getTransactionBufferStats(String topic,
+ boolean
lowWaterMarks) throws PulsarAdminException {
+ return getTransactionBufferStats(topic, lowWaterMarks, false);
+ }
/**
* Get transaction buffer stats.
@@ -170,7 +199,7 @@ public interface Transactions {
* @return the stats of transaction buffer in topic.
*/
default TransactionBufferStats getTransactionBufferStats(String topic)
throws PulsarAdminException {
- return getTransactionBufferStats(topic, false);
+ return getTransactionBufferStats(topic, false, false);
}
/**
@@ -309,6 +338,28 @@ public interface Transactions {
TransactionPendingAckInternalStats getPendingAckInternalStats(String
topic, String subName,
boolean
metadata) throws PulsarAdminException;
+ /**
+ * Get transaction buffer internal stats asynchronously.
+ *
+ * @param topic the topic to get transaction buffer internal stats from
+ * @param metadata whether to obtain ledger metadata
+ *
+ * @return the future internal stats of transaction buffer
+ */
+ CompletableFuture<TransactionBufferInternalStats>
getTransactionBufferInternalStatsAsync(String topic,
+
boolean metadata);
+
+ /**
+ * Get transaction buffer internal stats.
+ *
+ * @param topic the topic to get transaction buffer internal stats from
+ * @param metadata whether to obtain ledger metadata
+ *
+ * @return the internal stats of transaction buffer
+ */
+ TransactionBufferInternalStats getTransactionBufferInternalStats(String
topic,
+ boolean
metadata) throws PulsarAdminException;
+
/**
* Sets the scale of the transaction coordinators.
* And currently, we can only support scale-up.
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentStats.java
similarity index 51%
copy from
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentStats.java
index 73d66b8c230..007f4f4d632 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentStats.java
@@ -18,30 +18,15 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Map;
+public class SegmentStats {
+ public String lastTxnID;
+ public String persistentPosition;
-public class TransactionBufferStats {
+ public SegmentStats(String lastTxnID, String persistentPosition) {
+ this.lastTxnID = lastTxnID;
+ this.persistentPosition = persistentPosition;
+ }
- /** The state of this transaction buffer. */
- public String state;
-
- /** The max read position of this transaction buffer. */
- public String maxReadPosition;
-
- /** The last snapshot timestamps of this transaction buffer. */
- public long lastSnapshotTimestamps;
-
- /**
- * (Optional) The lowWaterMark details of the transaction buffer.
- */
- public Map<Long, Long> lowWaterMarks;
- /**
- * The total number of ongoing transactions in this transaction buffer.
- */
- public long ongoingTxnSize;
-
- //Start timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverStartTime;
- //End timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverEndTime;
+ public SegmentStats() {
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentsStats.java
similarity index 51%
copy from
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentsStats.java
index 73d66b8c230..46422c0b67b 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SegmentsStats.java
@@ -18,30 +18,20 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Map;
+import java.util.List;
-public class TransactionBufferStats {
+public class SegmentsStats {
+ // The current number of the snapshot segments.
+ public long segmentsSize;
- /** The state of this transaction buffer. */
- public String state;
+ // The capacity of snapshot segment calculated by the current config
(transactionBufferSnapshotSegmentSize)
+ public long currentSegmentCapacity;
- /** The max read position of this transaction buffer. */
- public String maxReadPosition;
+ // The latest aborted txn IDs which number less than currentSegmentCapacity
+ public long unsealedAbortTxnIDSize;
- /** The last snapshot timestamps of this transaction buffer. */
- public long lastSnapshotTimestamps;
-
- /**
- * (Optional) The lowWaterMark details of the transaction buffer.
- */
- public Map<Long, Long> lowWaterMarks;
- /**
- * The total number of ongoing transactions in this transaction buffer.
- */
- public long ongoingTxnSize;
-
- //Start timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverStartTime;
- //End timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverEndTime;
+ // A list of individual segment stats
+ public List<SegmentStats> segmentStats;
+ /** The last snapshot segment timestamps of this transaction buffer. */
+ public long lastTookSnapshotSegmentTimestamp;
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SnapshotSystemTopicInternalStats.java
similarity index 51%
copy from
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SnapshotSystemTopicInternalStats.java
index 73d66b8c230..7ce95375e72 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SnapshotSystemTopicInternalStats.java
@@ -18,30 +18,10 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Map;
+public class SnapshotSystemTopicInternalStats {
+ // The managed ledger name for the snapshot segment topic or index topic.
+ public String managedLedgerName;
-public class TransactionBufferStats {
-
- /** The state of this transaction buffer. */
- public String state;
-
- /** The max read position of this transaction buffer. */
- public String maxReadPosition;
-
- /** The last snapshot timestamps of this transaction buffer. */
- public long lastSnapshotTimestamps;
-
- /**
- * (Optional) The lowWaterMark details of the transaction buffer.
- */
- public Map<Long, Long> lowWaterMarks;
- /**
- * The total number of ongoing transactions in this transaction buffer.
- */
- public long ongoingTxnSize;
-
- //Start timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverStartTime;
- //End timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverEndTime;
+ // The managed ledger internal stats for the snapshot segment topic or
index topic.
+ public ManagedLedgerInternalStats managedLedgerInternalStats;
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferInternalStats.java
similarity index 51%
copy from
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
copy to
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferInternalStats.java
index 73d66b8c230..b4c9e096d0a 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferInternalStats.java
@@ -18,30 +18,16 @@
*/
package org.apache.pulsar.common.policies.data;
-import java.util.Map;
+public class TransactionBufferInternalStats {
+ // The type of snapshot being used: either "Single" or "Segment"
+ public String snapshotType;
-public class TransactionBufferStats {
+ // If snapshotType is "Single", this field will provide the statistics of
single snapshot log.
+ public SnapshotSystemTopicInternalStats
singleSnapshotSystemTopicInternalStats;
- /** The state of this transaction buffer. */
- public String state;
+ // If snapshotType is "Segment", this field will provide the statistics of
snapshot segment topic.
+ public SnapshotSystemTopicInternalStats segmentInternalStats;
- /** The max read position of this transaction buffer. */
- public String maxReadPosition;
-
- /** The last snapshot timestamps of this transaction buffer. */
- public long lastSnapshotTimestamps;
-
- /**
- * (Optional) The lowWaterMark details of the transaction buffer.
- */
- public Map<Long, Long> lowWaterMarks;
- /**
- * The total number of ongoing transactions in this transaction buffer.
- */
- public long ongoingTxnSize;
-
- //Start timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverStartTime;
- //End timestamp of transaction buffer recovery. 0L means no startup.
- public long recoverEndTime;
+ // If snapshotType is "Segment", this field will provide the statistics of
snapshot segment index topic.
+ public SnapshotSystemTopicInternalStats segmentIndexInternalStats;
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
index 73d66b8c230..1dffa0dd614 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
@@ -44,4 +44,13 @@ public class TransactionBufferStats {
public long recoverStartTime;
//End timestamp of transaction buffer recovery. 0L means no startup.
public long recoverEndTime;
+
+ // The total number of aborted transactions.
+ public long totalAbortedTransactions;
+
+ // The type of snapshot being used: either "Single" or "Segment"
+ public String snapshotType;
+
+ // If snapshotType is "Segment", this field will provide additional
segment-related statistics
+ public SegmentsStats segmentsStats;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 5693ebc8f60..2d1dd408ef6 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
@@ -132,17 +133,20 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
@Override
public CompletableFuture<TransactionBufferStats>
getTransactionBufferStatsAsync(String topic,
-
boolean lowWaterMarks) {
+
boolean lowWaterMarks,
+
boolean segmentStats) {
WebTarget path = adminV3Transactions.path("transactionBufferStats");
path = path.path(TopicName.get(topic).getRestPath(false));
path = path.queryParam("lowWaterMarks", lowWaterMarks);
+ path = path.queryParam("segmentStats", segmentStats);
return asyncGetRequest(path, new
FutureCallback<TransactionBufferStats>(){});
}
@Override
public TransactionBufferStats getTransactionBufferStats(String topic,
- boolean
lowWaterMarks) throws PulsarAdminException {
- return sync(() -> getTransactionBufferStatsAsync(topic,
lowWaterMarks));
+ boolean
lowWaterMarks,
+ boolean
segmentStats) throws PulsarAdminException {
+ return sync(() -> getTransactionBufferStatsAsync(topic, lowWaterMarks,
segmentStats));
}
@Override
@@ -227,6 +231,22 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
return sync(() -> getPendingAckInternalStatsAsync(topic, subName,
metadata));
}
+ @Override
+ public CompletableFuture<TransactionBufferInternalStats>
getTransactionBufferInternalStatsAsync(String topic,
+
boolean metadata) {
+ TopicName tn = TopicName.get(topic);
+ WebTarget path =
adminV3Transactions.path("transactionBufferInternalStats");
+ path = path.path(tn.getRestPath(false));
+ path = path.queryParam("metadata", metadata);
+ return asyncGetRequest(path, new
FutureCallback<TransactionBufferInternalStats>(){});
+ }
+
+ @Override
+ public TransactionBufferInternalStats
getTransactionBufferInternalStats(String topic, boolean metadata)
+ throws PulsarAdminException {
+ return sync(() -> getTransactionBufferInternalStatsAsync(topic,
metadata));
+ }
+
@Override
public void scaleTransactionCoordinators(int replicas) throws
PulsarAdminException {
sync(() -> scaleTransactionCoordinatorsAsync(replicas));
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 8509d037cba..a722abe19df 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2377,7 +2377,7 @@ public class PulsarAdminToolTest {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("transaction-buffer-stats -t test -l"));
- verify(transactions).getTransactionBufferStats("test", true);
+ verify(transactions).getTransactionBufferStats("test", true, false);
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("pending-ack-stats -t test -s test -l"));
@@ -2387,6 +2387,10 @@ public class PulsarAdminToolTest {
cmdTransactions.run(split("pending-ack-internal-stats -t test -s
test"));
verify(transactions).getPendingAckInternalStats("test", "test", false);
+ cmdTransactions = new CmdTransactions(() -> admin);
+ cmdTransactions.run(split("buffer-snapshot-internal-stats -t test"));
+ verify(transactions).getTransactionBufferInternalStats("test", false);
+
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
verify(transactions).scaleTransactionCoordinators(3);
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index 08ffba1451f..b999e30b108 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -56,9 +56,14 @@ public class CmdTransactions extends CmdBase {
description = "Whether to get information about lowWaterMarks
stored in transaction buffer.")
private boolean lowWaterMark;
+ @Parameter(names = {"-s", "--segment-stats"},
+ description = "Whether to get segment statistics.")
+ private boolean segmentStats = false;
+
@Override
void run() throws Exception {
- print(getAdmin().transactions().getTransactionBufferStats(topic,
lowWaterMark));
+ // Assuming getTransactionBufferStats method signature has been
updated to accept the new parameter
+ print(getAdmin().transactions().getTransactionBufferStats(topic,
lowWaterMark, segmentStats));
}
}
@@ -188,6 +193,20 @@ public class CmdTransactions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get transaction buffer internal stats")
+ private class GetTransactionBufferInternalStats extends CliCommand {
+ @Parameter(names = {"-t", "--topic"}, description = "Topic name",
required = true)
+ private String topic;
+
+ @Parameter(names = { "-m", "--metadata" }, description = "Flag to
include ledger metadata")
+ private boolean metadata = false;
+
+ @Override
+ void run() throws Exception {
+
print(getAdmin().transactions().getTransactionBufferInternalStats(topic,
metadata));
+ }
+ }
+
@Parameters(commandDescription = "Update the scale of transaction
coordinators")
private class ScaleTransactionCoordinators extends CliCommand {
@Parameter(names = { "-r", "--replicas" }, description = "The scale of
the transaction coordinators")
@@ -242,6 +261,7 @@ public class CmdTransactions extends CmdBase {
super("transactions", admin);
jcommander.addCommand("coordinator-internal-stats", new
GetCoordinatorInternalStats());
jcommander.addCommand("pending-ack-internal-stats", new
GetPendingAckInternalStats());
+ jcommander.addCommand("buffer-snapshot-internal-stats", new
GetTransactionBufferInternalStats());
jcommander.addCommand("coordinator-stats", new GetCoordinatorStats());
jcommander.addCommand("transaction-buffer-stats", new
GetTransactionBufferStats());
jcommander.addCommand("pending-ack-stats", new GetPendingAckStats());