This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e42503  [Transaction] Transaction admin api transaction in buffer 
stats (#10642)
3e42503 is described below

commit 3e425035bd96071be5ae18cf60dc8af36caabae0
Author: congbo <[email protected]>
AuthorDate: Mon May 24 16:37:26 2021 +0800

    [Transaction] Transaction admin api transaction in buffer stats (#10642)
    
    ## Motivation
    Transaction add admin api `getTransactionInBufferStats`
    
    ## implement
    ```
    @Data
    public class TransactionInBufferStats {
    
        /** The start position of this transaction in transaction buffer. */
        public String startPosition;
    
        /** The flag of this transaction have been aborted. */
        public boolean aborted;
    }
    ```
    This is transaction buffer metrics.
---
 .../pulsar/broker/admin/impl/TransactionsBase.java | 56 +++++++++++++++--
 .../pulsar/broker/admin/v3/Transactions.java       | 24 ++++++++
 .../broker/service/persistent/PersistentTopic.java |  5 ++
 .../transaction/buffer/TransactionBuffer.java      |  7 +++
 .../buffer/impl/InMemTransactionBuffer.java        |  6 ++
 .../buffer/impl/TopicTransactionBuffer.java        | 11 ++++
 .../buffer/impl/TransactionBufferDisable.java      |  6 ++
 .../broker/admin/v3/AdminApiTransactionTest.java   | 71 +++++++++++++++-------
 .../apache/pulsar/client/admin/Transactions.java   | 17 +++++-
 .../client/admin/internal/TransactionsImpl.java    | 40 +++++++++---
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 10 ++-
 .../apache/pulsar/admin/cli/CmdTransactions.java   | 21 ++++++-
 .../data/TransactionCoordinatorStatus.java         |  5 +-
 ...orStatus.java => TransactionInBufferStats.java} | 19 ++----
 .../impl/MLTransactionMetadataStore.java           |  3 +-
 15 files changed, 239 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 044650f..aab750b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -18,19 +18,27 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
 import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
+import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
 import com.google.common.collect.Lists;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 
@@ -64,23 +72,23 @@ public abstract class TransactionsBase extends 
AdminResource {
                             return;
                         }
                     }
-                    List<TransactionCoordinatorStatus> metadataStoreInfoList = 
new ArrayList<>();
+                    Map<Integer, TransactionCoordinatorStatus> status = new 
HashMap<>();
                     
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
                         if (e != null) {
                             asyncResponse.resume(new RestException(e));
                             return;
                         }
 
-                        for (CompletableFuture<TransactionCoordinatorStatus> 
transactionMetadataStoreInfoFuture
-                                : transactionMetadataStoreInfoFutures) {
+                        for (int i = 0; i < 
transactionMetadataStoreInfoFutures.size(); i++) {
                             try {
-                                
metadataStoreInfoList.add(transactionMetadataStoreInfoFuture.get());
+                                status.put(i, 
transactionMetadataStoreInfoFutures.get(i).get());
                             } catch (Exception exception) {
                                 asyncResponse.resume(new 
RestException(exception.getCause()));
                                 return;
                             }
                         }
-                        asyncResponse.resume(metadataStoreInfoList);
+
+                        asyncResponse.resume(status);
                     });
                 }).exceptionally(ex -> {
                     log.error("[{}] Failed to get transaction coordinator 
state.", clientAppId(), ex);
@@ -93,4 +101,40 @@ public abstract class TransactionsBase extends 
AdminResource {
                     "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
         }
     }
+
+    protected void internalGetTransactionInBufferStats(AsyncResponse 
asyncResponse, boolean authoritative,
+                                                       long mostSigBits, long 
leastSigBits,
+                                                       String topic) {
+        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+            validateTopicOwnership(TopicName.get(topic), authoritative);
+            CompletableFuture<Optional<Topic>> topicFuture = 
pulsar().getBrokerService()
+                    .getTopics().get(TopicName.get(topic).toString());
+            if (topicFuture != null) {
+                topicFuture.whenComplete((optionalTopic, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+                    if (!optionalTopic.isPresent()) {
+                        asyncResponse.resume(new 
RestException(INTERNAL_SERVER_ERROR,
+                                "Topic don't owner by this broker!"));
+                        return;
+                    }
+                    Topic topicObject = optionalTopic.get();
+                    if (topicObject instanceof PersistentTopic) {
+                        TransactionInBufferStats transactionInBufferStats = 
((PersistentTopic) topicObject)
+                                .getTransactionInBufferStats(new 
TxnID(mostSigBits, leastSigBits));
+                        asyncResponse.resume(transactionInBufferStats);
+                    } else {
+                        asyncResponse.resume(new 
RestException(NOT_IMPLEMENTED, "Topic is not a persistent topic!"));
+                    }
+                });
+            } else {
+                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, 
"Topic don't owner by this broker!"));
+            }
+        } else {
+            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
+                    "This Broker is not configured with 
transactionCoordinatorEnabled=true."));
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 1fece00..bad39fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import javax.ws.rs.Consumes;
@@ -53,4 +54,27 @@ public class Transactions extends TransactionsBase {
                                      @QueryParam("coordinatorId") Integer 
coordinatorId) {
         internalGetCoordinatorStatus(asyncResponse, authoritative, 
coordinatorId);
     }
+
+    @GET
+    @Path("/transactionInBufferStats")
+    @ApiOperation(value = "Get transaction state in transaction buffer.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic don't owner by this 
broker!"),
+            @ApiResponse(code = 501, message = "Topic is not a persistent 
topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getTransactionInBufferStats(@Suspended final AsyncResponse 
asyncResponse,
+                                            @QueryParam("authoritative")
+                                            @DefaultValue("false") boolean 
authoritative,
+                                            @QueryParam("mostSigBits")
+                                            @ApiParam(value = "Most sig bits 
of this transaction", required = true)
+                                                    long mostSigBits,
+                                            @ApiParam(value = "Least sig bits 
of this transaction", required = true)
+                                            @QueryParam("leastSigBits") long 
leastSigBits,
+                                            @ApiParam(value = "Topic", 
required = true)
+                                            @QueryParam("topic") String topic) 
{
+        internalGetTransactionInBufferStats(asyncResponse, authoritative, 
mostSigBits, leastSigBits, topic);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7344f59..12d54a8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -132,6 +132,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -3065,6 +3066,10 @@ public class PersistentTopic extends AbstractTopic
         return this.transactionBuffer.isTxnAborted(txnID);
     }
 
+    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+        return this.transactionBuffer.getTransactionInBufferStats(txnID);
+    }
+
     @Override
     protected boolean isTerminated() {
         return ledger.isTerminated();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 02e06cc..a201219 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 
 /**
  * A class represent a transaction buffer. The transaction buffer
@@ -147,4 +148,10 @@ public interface TransactionBuffer {
      * @return the stable position.
      */
     PositionImpl getMaxReadPosition();
+
+    /**
+     * Get transaction in buffer stats.
+     * @return the transaction in buffer stats.
+     */
+    TransactionInBufferStats getTransactionInBufferStats(TxnID txnID);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index f5813fd..42413a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -41,6 +41,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSeal
 import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
 import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 
@@ -364,4 +365,9 @@ class InMemTransactionBuffer implements TransactionBuffer {
         return PositionImpl.latest;
     }
 
+    @Override
+    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+        return null;
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index abf71e9..e62d304 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.jctools.queues.MessagePassingQueue;
@@ -386,6 +387,16 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     }
 
     @Override
+    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+        TransactionInBufferStats transactionInBufferStats = new 
TransactionInBufferStats();
+        transactionInBufferStats.aborted = isTxnAborted(txnID);
+        if (ongoingTxns.containsKey(txnID)) {
+            transactionInBufferStats.startPosition = 
ongoingTxns.get(txnID).toString();
+        }
+        return transactionInBufferStats;
+    }
+
+    @Override
     public void run(Timeout timeout) {
         if (checkIfReady()) {
             takeSnapshotByTimeout();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 26d0d72..9cb52f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -29,6 +29,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.util.FutureUtil;
 
 /**
@@ -85,4 +86,9 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
     public PositionImpl getMaxReadPosition() {
         return PositionImpl.latest;
     }
+
+    @Override
+    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+        return null;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 793788a..0daa54a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -19,23 +19,32 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import com.google.common.collect.Sets;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import 
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
-import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
 
@@ -68,32 +77,52 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
         getTransaction().abort().get();
         TransactionCoordinatorStatus transactionCoordinatorStatus =
                 admin.transactions().getCoordinatorStatusById(1).get();
-        verifyCoordinatorStatus(1L, transactionCoordinatorStatus.coordinatorId,
-                transactionCoordinatorStatus.state,
-                transactionCoordinatorStatus.sequenceId, 
transactionCoordinatorStatus.lowWaterMark);
+        verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+                transactionCoordinatorStatus.leastSigBits, 
transactionCoordinatorStatus.lowWaterMark);
 
         transactionCoordinatorStatus = 
admin.transactions().getCoordinatorStatusById(0).get();
-        verifyCoordinatorStatus(0L, transactionCoordinatorStatus.coordinatorId,
-                transactionCoordinatorStatus.state,
-                transactionCoordinatorStatus.sequenceId, 
transactionCoordinatorStatus.lowWaterMark);
-        List<TransactionCoordinatorStatus> list = 
admin.transactions().getCoordinatorStatusList().get();
+        verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+                transactionCoordinatorStatus.leastSigBits, 
transactionCoordinatorStatus.lowWaterMark);
+        Map<Integer, TransactionCoordinatorStatus> status = 
admin.transactions().getCoordinatorStatus().get();
+
+        assertEquals(status.size(), 2);
+
+        transactionCoordinatorStatus = status.get(0);
+        verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+                transactionCoordinatorStatus.leastSigBits, 
transactionCoordinatorStatus.lowWaterMark);
 
-        assertEquals(list.size(), 2);
+        transactionCoordinatorStatus = status.get(1);
+        verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+                transactionCoordinatorStatus.leastSigBits, 
transactionCoordinatorStatus.lowWaterMark);
+    }
+
+    @Test(timeOut = 20000)
+    public void testGetTransactionInBufferStats() throws Exception {
+        initTransaction(2);
+        TransactionImpl transaction = (TransactionImpl) getTransaction();
+        final String topic = 
"persistent://public/default/testGetTransactionInBufferStats";
+        admin.topics().createNonPartitionedTopic(topic);
+        Producer<byte[]> producer = 
pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, 
TimeUnit.SECONDS).create();
+        MessageId messageId = producer.newMessage(transaction).value("Hello 
pulsar!".getBytes()).send();
+        TransactionInBufferStats transactionInBufferStats = 
admin.transactions()
+                .getTransactionInBufferStats(new 
TxnID(transaction.getTxnIdMostBits(),
+                        transaction.getTxnIdLeastBits()), topic).get();
+        PositionImpl position =
+                PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), 
((MessageIdImpl) messageId).getEntryId());
+        assertEquals(transactionInBufferStats.startPosition, 
position.toString());
+        assertFalse(transactionInBufferStats.aborted);
 
-        transactionCoordinatorStatus = list.get(0);
-        verifyCoordinatorStatus(0L, transactionCoordinatorStatus.coordinatorId,
-                transactionCoordinatorStatus.state,
-                transactionCoordinatorStatus.sequenceId, 
transactionCoordinatorStatus.lowWaterMark);
+        transaction.abort().get();
 
-        transactionCoordinatorStatus = list.get(1);
-        verifyCoordinatorStatus(1L, transactionCoordinatorStatus.coordinatorId,
-                transactionCoordinatorStatus.state,
-                transactionCoordinatorStatus.sequenceId, 
transactionCoordinatorStatus.lowWaterMark);
+        transactionInBufferStats = admin.transactions()
+                .getTransactionInBufferStats(new 
TxnID(transaction.getTxnIdMostBits(),
+                        transaction.getTxnIdLeastBits()), topic).get();
+        assertNull(transactionInBufferStats.startPosition);
+        assertTrue(transactionInBufferStats.aborted);
     }
 
-    private static void verifyCoordinatorStatus(long expectedCoordinatorId, 
long coordinatorId, String state,
+    private static void verifyCoordinatorStatus(String state,
                                                 long sequenceId, long 
lowWaterMark) {
-        assertEquals(coordinatorId, expectedCoordinatorId);
         assertEquals(state, "Ready");
         assertEquals(sequenceId, 0);
         assertEquals(lowWaterMark, 0);
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 2719796..147ffcc 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.client.admin;
 
-import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 
 public interface Transactions {
 
@@ -35,8 +37,17 @@ public interface Transactions {
     /**
      * Get transaction metadataStore status.
      *
-     * @return the list future of transaction metadata store status.
+     * @return the map future of transaction metadata store status.
+     */
+    CompletableFuture<Map<Integer, TransactionCoordinatorStatus>> 
getCoordinatorStatus();
+
+    /**
+     * Get transaction in buffer stats.
+     *
+     * @param txnID the txnId
+     * @param topic the produce topic
+     * @return the future stats of transaction in buffer.
      */
-    CompletableFuture<List<TransactionCoordinatorStatus>> 
getCoordinatorStatusList();
+    CompletableFuture<TransactionInBufferStats> 
getTransactionInBufferStats(TxnID txnID, String topic);
 
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 3779ac3..43c5ccf 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -18,13 +18,15 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
-import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import org.apache.pulsar.client.admin.Transactions;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 
 public class TransactionsImpl extends BaseResource implements Transactions {
     private final WebTarget adminV3Transactions;
@@ -55,22 +57,44 @@ public class TransactionsImpl extends BaseResource 
implements Transactions {
     }
 
     @Override
-    public CompletableFuture<List<TransactionCoordinatorStatus>> 
getCoordinatorStatusList() {
+    public CompletableFuture<Map<Integer, TransactionCoordinatorStatus>> 
getCoordinatorStatus() {
         WebTarget path = adminV3Transactions.path("coordinatorStatus");
-        final CompletableFuture<List<TransactionCoordinatorStatus>> statusList 
= new CompletableFuture<>();
+        final CompletableFuture<Map<Integer, TransactionCoordinatorStatus>> 
status = new CompletableFuture<>();
         asyncGetRequest(path,
-                new InvocationCallback<List<TransactionCoordinatorStatus>>() {
+                new InvocationCallback<Map<Integer, 
TransactionCoordinatorStatus>>() {
                     @Override
-                    public void completed(List<TransactionCoordinatorStatus> 
topics) {
-                        statusList.complete(topics);
+                    public void completed(Map<Integer, 
TransactionCoordinatorStatus> topics) {
+                        status.complete(topics);
                     }
 
                     @Override
                     public void failed(Throwable throwable) {
-                        
statusList.completeExceptionally(getApiException(throwable.getCause()));
+                        
status.completeExceptionally(getApiException(throwable.getCause()));
                     }
                 });
-        return statusList;
+        return status;
+    }
+
+    @Override
+    public CompletableFuture<TransactionInBufferStats> 
getTransactionInBufferStats(TxnID txnID, String topic) {
+        WebTarget path = adminV3Transactions.path("transactionInBufferStats");
+        path = path.queryParam("mostSigBits", txnID.getMostSigBits());
+        path = path.queryParam("leastSigBits", txnID.getLeastSigBits());
+        path = path.queryParam("topic", topic);
+        final CompletableFuture<TransactionInBufferStats> future = new 
CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<TransactionInBufferStats>() {
+                    @Override
+                    public void completed(TransactionInBufferStats stats) {
+                        future.complete(stats);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        
future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
     }
 
 }
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 9a83204..b5356bc 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -48,7 +48,6 @@ import 
org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.Lookup;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.NonPersistentTopics;
-import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.ProxyStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.ResourceQuotas;
@@ -60,6 +59,7 @@ import 
org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
@@ -1421,7 +1421,7 @@ public class PulsarAdminToolTest {
         CompletableFuture<List<TransactionCoordinatorStatus>> lists = 
mock(CompletableFuture.class);
         doReturn(transactions).when(admin).transactions();
         
doReturn(transactionMetadataStoreInfo).when(transactions).getCoordinatorStatusById(1);
-        doReturn(lists).when(transactions).getCoordinatorStatusList();
+        doReturn(lists).when(transactions).getCoordinatorStatus();
 
         CmdTransactions cmdTransactions = new CmdTransactions(() -> admin);
 
@@ -1430,7 +1430,11 @@ public class PulsarAdminToolTest {
 
         cmdTransactions = new CmdTransactions(() -> admin);
         cmdTransactions.run(split("coordinator-status"));
-        verify(transactions).getCoordinatorStatusList();
+        verify(transactions).getCoordinatorStatus();
+
+        cmdTransactions = new CmdTransactions(() -> admin);
+        cmdTransactions.run(split("transaction-in-buffer-stats -m 1 -t test -l 
2"));
+        verify(transactions).getTransactionInBufferStats(new TxnID(1, 2), 
"test");
     }
 
     String[] split(String s) {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index f1a133c..9ede39c 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import java.util.function.Supplier;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.transaction.TxnID;
 
 @Parameters(commandDescription = "Operations on transactions")
 public class CmdTransactions extends CmdBase {
@@ -36,13 +37,31 @@ public class CmdTransactions extends CmdBase {
             if (coordinatorId != null) {
                 
print(getAdmin().transactions().getCoordinatorStatusById(coordinatorId));
             } else {
-                print(getAdmin().transactions().getCoordinatorStatusList());
+                print(getAdmin().transactions().getCoordinatorStatus());
             }
         }
     }
 
+    @Parameters(commandDescription = "Get transaction in buffer stats")
+    private class GetTransactionInBufferStats extends CliCommand {
+        @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most 
sig bits", required = true)
+        private int mostSigBits;
+
+        @Parameter(names = {"-l", "--least-sig-bits"}, description = "the 
least sig bits", required = true)
+        private long leastSigBits;
+
+        @Parameter(names = {"-t", "--topic"}, description = "the topic", 
required = true)
+        private String topic;
+
+        @Override
+        void run() throws Exception {
+            print(getAdmin().transactions().getTransactionInBufferStats(new 
TxnID(mostSigBits, leastSigBits), topic));
+        }
+    }
+
     public CmdTransactions(Supplier<PulsarAdmin> admin) {
         super("transactions", admin);
         jcommander.addCommand("coordinator-status", new 
GetCoordinatorStatus());
+        jcommander.addCommand("transaction-in-buffer-stats", new 
GetTransactionInBufferStats());
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
index 63c7846..033a66a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
@@ -26,14 +26,11 @@ import lombok.Data;
 @Data
 public class TransactionCoordinatorStatus {
 
-    /** The transaction coordinatorId. */
-    public long coordinatorId;
-
     /** The state of this transaction metadataStore. */
     public String state;
 
     /** The sequenceId of transaction metadataStore. */
-    public long sequenceId;
+    public long leastSigBits;
 
     /** The low water mark of transaction metadataStore. */
     public long lowWaterMark;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionInBufferStats.java
similarity index 68%
copy from 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionInBufferStats.java
index 63c7846..194f5f38a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionInBufferStats.java
@@ -20,21 +20,12 @@ package org.apache.pulsar.common.policies.data;
 
 import lombok.Data;
 
-/**
- * Transaction coordinator status.
- */
 @Data
-public class TransactionCoordinatorStatus {
-
-    /** The transaction coordinatorId. */
-    public long coordinatorId;
-
-    /** The state of this transaction metadataStore. */
-    public String state;
+public class TransactionInBufferStats {
 
-    /** The sequenceId of transaction metadataStore. */
-    public long sequenceId;
+    /** The start position of this transaction in transaction buffer. */
+    public String startPosition;
 
-    /** The low water mark of transaction metadataStore. */
-    public long lowWaterMark;
+    /** The flag of this transaction have been aborted. */
+    public boolean aborted;
 }
diff --git 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index fdab0bf..78ffb64 100644
--- 
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ 
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -377,10 +377,9 @@ public class MLTransactionMetadataStore
     @Override
     public TransactionCoordinatorStatus getStatus() {
         TransactionCoordinatorStatus transactionCoordinatorStatus = new 
TransactionCoordinatorStatus();
-        transactionCoordinatorStatus.setCoordinatorId(tcID.getId());
         transactionCoordinatorStatus.setLowWaterMark(getLowWaterMark());
         transactionCoordinatorStatus.setState(getState().name());
-        transactionCoordinatorStatus.setSequenceId(sequenceId.get());
+        transactionCoordinatorStatus.setLeastSigBits(sequenceId.get());
         return transactionCoordinatorStatus;
     }
 

Reply via email to