This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3e42503 [Transaction] Transaction admin api transaction in buffer
stats (#10642)
3e42503 is described below
commit 3e425035bd96071be5ae18cf60dc8af36caabae0
Author: congbo <[email protected]>
AuthorDate: Mon May 24 16:37:26 2021 +0800
[Transaction] Transaction admin api transaction in buffer stats (#10642)
## Motivation
Transaction add admin api `getTransactionInBufferStats`
## implement
```
@Data
public class TransactionInBufferStats {
/** The start position of this transaction in transaction buffer. */
public String startPosition;
/** The flag of this transaction have been aborted. */
public boolean aborted;
}
```
This is transaction buffer metrics.
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 56 +++++++++++++++--
.../pulsar/broker/admin/v3/Transactions.java | 24 ++++++++
.../broker/service/persistent/PersistentTopic.java | 5 ++
.../transaction/buffer/TransactionBuffer.java | 7 +++
.../buffer/impl/InMemTransactionBuffer.java | 6 ++
.../buffer/impl/TopicTransactionBuffer.java | 11 ++++
.../buffer/impl/TransactionBufferDisable.java | 6 ++
.../broker/admin/v3/AdminApiTransactionTest.java | 71 +++++++++++++++-------
.../apache/pulsar/client/admin/Transactions.java | 17 +++++-
.../client/admin/internal/TransactionsImpl.java | 40 +++++++++---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 10 ++-
.../apache/pulsar/admin/cli/CmdTransactions.java | 21 ++++++-
.../data/TransactionCoordinatorStatus.java | 5 +-
...orStatus.java => TransactionInBufferStats.java} | 19 ++----
.../impl/MLTransactionMetadataStore.java | 3 +-
15 files changed, 239 insertions(+), 62 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 044650f..aab750b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -18,19 +18,27 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
+import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
import com.google.common.collect.Lists;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -64,23 +72,23 @@ public abstract class TransactionsBase extends
AdminResource {
return;
}
}
- List<TransactionCoordinatorStatus> metadataStoreInfoList =
new ArrayList<>();
+ Map<Integer, TransactionCoordinatorStatus> status = new
HashMap<>();
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}
- for (CompletableFuture<TransactionCoordinatorStatus>
transactionMetadataStoreInfoFuture
- : transactionMetadataStoreInfoFutures) {
+ for (int i = 0; i <
transactionMetadataStoreInfoFutures.size(); i++) {
try {
-
metadataStoreInfoList.add(transactionMetadataStoreInfoFuture.get());
+ status.put(i,
transactionMetadataStoreInfoFutures.get(i).get());
} catch (Exception exception) {
asyncResponse.resume(new
RestException(exception.getCause()));
return;
}
}
- asyncResponse.resume(metadataStoreInfoList);
+
+ asyncResponse.resume(status);
});
}).exceptionally(ex -> {
log.error("[{}] Failed to get transaction coordinator
state.", clientAppId(), ex);
@@ -93,4 +101,40 @@ public abstract class TransactionsBase extends
AdminResource {
"This Broker is not configured with
transactionCoordinatorEnabled=true."));
}
}
+
+ protected void internalGetTransactionInBufferStats(AsyncResponse
asyncResponse, boolean authoritative,
+ long mostSigBits, long
leastSigBits,
+ String topic) {
+ if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+ validateTopicOwnership(TopicName.get(topic), authoritative);
+ CompletableFuture<Optional<Topic>> topicFuture =
pulsar().getBrokerService()
+ .getTopics().get(TopicName.get(topic).toString());
+ if (topicFuture != null) {
+ topicFuture.whenComplete((optionalTopic, e) -> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ if (!optionalTopic.isPresent()) {
+ asyncResponse.resume(new
RestException(INTERNAL_SERVER_ERROR,
+ "Topic don't owner by this broker!"));
+ return;
+ }
+ Topic topicObject = optionalTopic.get();
+ if (topicObject instanceof PersistentTopic) {
+ TransactionInBufferStats transactionInBufferStats =
((PersistentTopic) topicObject)
+ .getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits));
+ asyncResponse.resume(transactionInBufferStats);
+ } else {
+ asyncResponse.resume(new
RestException(NOT_IMPLEMENTED, "Topic is not a persistent topic!"));
+ }
+ });
+ } else {
+ asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic don't owner by this broker!"));
+ }
+ } else {
+ asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
+ "This Broker is not configured with
transactionCoordinatorEnabled=true."));
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 1fece00..bad39fe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.v3;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.Consumes;
@@ -53,4 +54,27 @@ public class Transactions extends TransactionsBase {
@QueryParam("coordinatorId") Integer
coordinatorId) {
internalGetCoordinatorStatus(asyncResponse, authoritative,
coordinatorId);
}
+
+ @GET
+ @Path("/transactionInBufferStats")
+ @ApiOperation(value = "Get transaction state in transaction buffer.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic doesn't exist"),
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 307, message = "Topic don't owner by this
broker!"),
+ @ApiResponse(code = 501, message = "Topic is not a persistent
topic!"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getTransactionInBufferStats(@Suspended final AsyncResponse
asyncResponse,
+ @QueryParam("authoritative")
+ @DefaultValue("false") boolean
authoritative,
+ @QueryParam("mostSigBits")
+ @ApiParam(value = "Most sig bits
of this transaction", required = true)
+ long mostSigBits,
+ @ApiParam(value = "Least sig bits
of this transaction", required = true)
+ @QueryParam("leastSigBits") long
leastSigBits,
+ @ApiParam(value = "Topic",
required = true)
+ @QueryParam("topic") String topic)
{
+ internalGetTransactionInBufferStats(asyncResponse, authoritative,
mostSigBits, leastSigBits, topic);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7344f59..12d54a8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -132,6 +132,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -3065,6 +3066,10 @@ public class PersistentTopic extends AbstractTopic
return this.transactionBuffer.isTxnAborted(txnID);
}
+ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+ return this.transactionBuffer.getTransactionInBufferStats(txnID);
+ }
+
@Override
protected boolean isTerminated() {
return ledger.isTerminated();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 02e06cc..a201219 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
/**
* A class represent a transaction buffer. The transaction buffer
@@ -147,4 +148,10 @@ public interface TransactionBuffer {
* @return the stable position.
*/
PositionImpl getMaxReadPosition();
+
+ /**
+ * Get transaction in buffer stats.
+ * @return the transaction in buffer stats.
+ */
+ TransactionInBufferStats getTransactionInBufferStats(TxnID txnID);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index f5813fd..42413a4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -41,6 +41,7 @@ import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSeal
import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
@@ -364,4 +365,9 @@ class InMemTransactionBuffer implements TransactionBuffer {
return PositionImpl.latest;
}
+ @Override
+ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+ return null;
+ }
+
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index abf71e9..e62d304 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.jctools.queues.MessagePassingQueue;
@@ -386,6 +387,16 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
@Override
+ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+ TransactionInBufferStats transactionInBufferStats = new
TransactionInBufferStats();
+ transactionInBufferStats.aborted = isTxnAborted(txnID);
+ if (ongoingTxns.containsKey(txnID)) {
+ transactionInBufferStats.startPosition =
ongoingTxns.get(txnID).toString();
+ }
+ return transactionInBufferStats;
+ }
+
+ @Override
public void run(Timeout timeout) {
if (checkIfReady()) {
takeSnapshotByTimeout();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index 26d0d72..9cb52f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -29,6 +29,7 @@ import
org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
/**
@@ -85,4 +86,9 @@ public class TransactionBufferDisable implements
TransactionBuffer {
public PositionImpl getMaxReadPosition() {
return PositionImpl.latest;
}
+
+ @Override
+ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+ return null;
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 793788a..0daa54a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -19,23 +19,32 @@
package org.apache.pulsar.broker.admin.v3;
import com.google.common.collect.Sets;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import
org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
-import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
-
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
@@ -68,32 +77,52 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
getTransaction().abort().get();
TransactionCoordinatorStatus transactionCoordinatorStatus =
admin.transactions().getCoordinatorStatusById(1).get();
- verifyCoordinatorStatus(1L, transactionCoordinatorStatus.coordinatorId,
- transactionCoordinatorStatus.state,
- transactionCoordinatorStatus.sequenceId,
transactionCoordinatorStatus.lowWaterMark);
+ verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+ transactionCoordinatorStatus.leastSigBits,
transactionCoordinatorStatus.lowWaterMark);
transactionCoordinatorStatus =
admin.transactions().getCoordinatorStatusById(0).get();
- verifyCoordinatorStatus(0L, transactionCoordinatorStatus.coordinatorId,
- transactionCoordinatorStatus.state,
- transactionCoordinatorStatus.sequenceId,
transactionCoordinatorStatus.lowWaterMark);
- List<TransactionCoordinatorStatus> list =
admin.transactions().getCoordinatorStatusList().get();
+ verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+ transactionCoordinatorStatus.leastSigBits,
transactionCoordinatorStatus.lowWaterMark);
+ Map<Integer, TransactionCoordinatorStatus> status =
admin.transactions().getCoordinatorStatus().get();
+
+ assertEquals(status.size(), 2);
+
+ transactionCoordinatorStatus = status.get(0);
+ verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+ transactionCoordinatorStatus.leastSigBits,
transactionCoordinatorStatus.lowWaterMark);
- assertEquals(list.size(), 2);
+ transactionCoordinatorStatus = status.get(1);
+ verifyCoordinatorStatus(transactionCoordinatorStatus.state,
+ transactionCoordinatorStatus.leastSigBits,
transactionCoordinatorStatus.lowWaterMark);
+ }
+
+ @Test(timeOut = 20000)
+ public void testGetTransactionInBufferStats() throws Exception {
+ initTransaction(2);
+ TransactionImpl transaction = (TransactionImpl) getTransaction();
+ final String topic =
"persistent://public/default/testGetTransactionInBufferStats";
+ admin.topics().createNonPartitionedTopic(topic);
+ Producer<byte[]> producer =
pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0,
TimeUnit.SECONDS).create();
+ MessageId messageId = producer.newMessage(transaction).value("Hello
pulsar!".getBytes()).send();
+ TransactionInBufferStats transactionInBufferStats =
admin.transactions()
+ .getTransactionInBufferStats(new
TxnID(transaction.getTxnIdMostBits(),
+ transaction.getTxnIdLeastBits()), topic).get();
+ PositionImpl position =
+ PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId());
+ assertEquals(transactionInBufferStats.startPosition,
position.toString());
+ assertFalse(transactionInBufferStats.aborted);
- transactionCoordinatorStatus = list.get(0);
- verifyCoordinatorStatus(0L, transactionCoordinatorStatus.coordinatorId,
- transactionCoordinatorStatus.state,
- transactionCoordinatorStatus.sequenceId,
transactionCoordinatorStatus.lowWaterMark);
+ transaction.abort().get();
- transactionCoordinatorStatus = list.get(1);
- verifyCoordinatorStatus(1L, transactionCoordinatorStatus.coordinatorId,
- transactionCoordinatorStatus.state,
- transactionCoordinatorStatus.sequenceId,
transactionCoordinatorStatus.lowWaterMark);
+ transactionInBufferStats = admin.transactions()
+ .getTransactionInBufferStats(new
TxnID(transaction.getTxnIdMostBits(),
+ transaction.getTxnIdLeastBits()), topic).get();
+ assertNull(transactionInBufferStats.startPosition);
+ assertTrue(transactionInBufferStats.aborted);
}
- private static void verifyCoordinatorStatus(long expectedCoordinatorId,
long coordinatorId, String state,
+ private static void verifyCoordinatorStatus(String state,
long sequenceId, long
lowWaterMark) {
- assertEquals(coordinatorId, expectedCoordinatorId);
assertEquals(state, "Ready");
assertEquals(sequenceId, 0);
assertEquals(lowWaterMark, 0);
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 2719796..147ffcc 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.client.admin;
-import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
public interface Transactions {
@@ -35,8 +37,17 @@ public interface Transactions {
/**
* Get transaction metadataStore status.
*
- * @return the list future of transaction metadata store status.
+ * @return the map future of transaction metadata store status.
+ */
+ CompletableFuture<Map<Integer, TransactionCoordinatorStatus>>
getCoordinatorStatus();
+
+ /**
+ * Get transaction in buffer stats.
+ *
+ * @param txnID the txnId
+ * @param topic the produce topic
+ * @return the future stats of transaction in buffer.
*/
- CompletableFuture<List<TransactionCoordinatorStatus>>
getCoordinatorStatusList();
+ CompletableFuture<TransactionInBufferStats>
getTransactionInBufferStats(TxnID txnID, String topic);
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 3779ac3..43c5ccf 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.client.admin.internal;
-import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
public class TransactionsImpl extends BaseResource implements Transactions {
private final WebTarget adminV3Transactions;
@@ -55,22 +57,44 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
}
@Override
- public CompletableFuture<List<TransactionCoordinatorStatus>>
getCoordinatorStatusList() {
+ public CompletableFuture<Map<Integer, TransactionCoordinatorStatus>>
getCoordinatorStatus() {
WebTarget path = adminV3Transactions.path("coordinatorStatus");
- final CompletableFuture<List<TransactionCoordinatorStatus>> statusList
= new CompletableFuture<>();
+ final CompletableFuture<Map<Integer, TransactionCoordinatorStatus>>
status = new CompletableFuture<>();
asyncGetRequest(path,
- new InvocationCallback<List<TransactionCoordinatorStatus>>() {
+ new InvocationCallback<Map<Integer,
TransactionCoordinatorStatus>>() {
@Override
- public void completed(List<TransactionCoordinatorStatus>
topics) {
- statusList.complete(topics);
+ public void completed(Map<Integer,
TransactionCoordinatorStatus> topics) {
+ status.complete(topics);
}
@Override
public void failed(Throwable throwable) {
-
statusList.completeExceptionally(getApiException(throwable.getCause()));
+
status.completeExceptionally(getApiException(throwable.getCause()));
}
});
- return statusList;
+ return status;
+ }
+
+ @Override
+ public CompletableFuture<TransactionInBufferStats>
getTransactionInBufferStats(TxnID txnID, String topic) {
+ WebTarget path = adminV3Transactions.path("transactionInBufferStats");
+ path = path.queryParam("mostSigBits", txnID.getMostSigBits());
+ path = path.queryParam("leastSigBits", txnID.getLeastSigBits());
+ path = path.queryParam("topic", topic);
+ final CompletableFuture<TransactionInBufferStats> future = new
CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<TransactionInBufferStats>() {
+ @Override
+ public void completed(TransactionInBufferStats stats) {
+ future.complete(stats);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
}
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 9a83204..b5356bc 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -48,7 +48,6 @@ import
org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.Lookup;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.NonPersistentTopics;
-import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.ProxyStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ResourceQuotas;
@@ -60,6 +59,7 @@ import
org.apache.pulsar.client.admin.internal.OffloadProcessStatusImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
@@ -1421,7 +1421,7 @@ public class PulsarAdminToolTest {
CompletableFuture<List<TransactionCoordinatorStatus>> lists =
mock(CompletableFuture.class);
doReturn(transactions).when(admin).transactions();
doReturn(transactionMetadataStoreInfo).when(transactions).getCoordinatorStatusById(1);
- doReturn(lists).when(transactions).getCoordinatorStatusList();
+ doReturn(lists).when(transactions).getCoordinatorStatus();
CmdTransactions cmdTransactions = new CmdTransactions(() -> admin);
@@ -1430,7 +1430,11 @@ public class PulsarAdminToolTest {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("coordinator-status"));
- verify(transactions).getCoordinatorStatusList();
+ verify(transactions).getCoordinatorStatus();
+
+ cmdTransactions = new CmdTransactions(() -> admin);
+ cmdTransactions.run(split("transaction-in-buffer-stats -m 1 -t test -l
2"));
+ verify(transactions).getTransactionInBufferStats(new TxnID(1, 2),
"test");
}
String[] split(String s) {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index f1a133c..9ede39c 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.transaction.TxnID;
@Parameters(commandDescription = "Operations on transactions")
public class CmdTransactions extends CmdBase {
@@ -36,13 +37,31 @@ public class CmdTransactions extends CmdBase {
if (coordinatorId != null) {
print(getAdmin().transactions().getCoordinatorStatusById(coordinatorId));
} else {
- print(getAdmin().transactions().getCoordinatorStatusList());
+ print(getAdmin().transactions().getCoordinatorStatus());
}
}
}
+ @Parameters(commandDescription = "Get transaction in buffer stats")
+ private class GetTransactionInBufferStats extends CliCommand {
+ @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most
sig bits", required = true)
+ private int mostSigBits;
+
+ @Parameter(names = {"-l", "--least-sig-bits"}, description = "the
least sig bits", required = true)
+ private long leastSigBits;
+
+ @Parameter(names = {"-t", "--topic"}, description = "the topic",
required = true)
+ private String topic;
+
+ @Override
+ void run() throws Exception {
+ print(getAdmin().transactions().getTransactionInBufferStats(new
TxnID(mostSigBits, leastSigBits), topic));
+ }
+ }
+
public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
jcommander.addCommand("coordinator-status", new
GetCoordinatorStatus());
+ jcommander.addCommand("transaction-in-buffer-stats", new
GetTransactionInBufferStats());
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
index 63c7846..033a66a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
@@ -26,14 +26,11 @@ import lombok.Data;
@Data
public class TransactionCoordinatorStatus {
- /** The transaction coordinatorId. */
- public long coordinatorId;
-
/** The state of this transaction metadataStore. */
public String state;
/** The sequenceId of transaction metadataStore. */
- public long sequenceId;
+ public long leastSigBits;
/** The low water mark of transaction metadataStore. */
public long lowWaterMark;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionInBufferStats.java
similarity index 68%
copy from
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionInBufferStats.java
index 63c7846..194f5f38a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStatus.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionInBufferStats.java
@@ -20,21 +20,12 @@ package org.apache.pulsar.common.policies.data;
import lombok.Data;
-/**
- * Transaction coordinator status.
- */
@Data
-public class TransactionCoordinatorStatus {
-
- /** The transaction coordinatorId. */
- public long coordinatorId;
-
- /** The state of this transaction metadataStore. */
- public String state;
+public class TransactionInBufferStats {
- /** The sequenceId of transaction metadataStore. */
- public long sequenceId;
+ /** The start position of this transaction in transaction buffer. */
+ public String startPosition;
- /** The low water mark of transaction metadataStore. */
- public long lowWaterMark;
+ /** The flag of this transaction have been aborted. */
+ public boolean aborted;
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index fdab0bf..78ffb64 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -377,10 +377,9 @@ public class MLTransactionMetadataStore
@Override
public TransactionCoordinatorStatus getStatus() {
TransactionCoordinatorStatus transactionCoordinatorStatus = new
TransactionCoordinatorStatus();
- transactionCoordinatorStatus.setCoordinatorId(tcID.getId());
transactionCoordinatorStatus.setLowWaterMark(getLowWaterMark());
transactionCoordinatorStatus.setState(getState().name());
- transactionCoordinatorStatus.setSequenceId(sequenceId.get());
+ transactionCoordinatorStatus.setLeastSigBits(sequenceId.get());
return transactionCoordinatorStatus;
}