congbobo184 commented on code in PR #15682:
URL: https://github.com/apache/pulsar/pull/15682#discussion_r890765839
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java:
##########
@@ -138,4 +138,33 @@ public boolean hasAckSet() {
public PositionInfo getPositionInfo() {
return
PositionInfo.newBuilder().setLedgerId(ledgerId).setEntryId(entryId).build();
}
+
+ public String getAckSetAsString() {
Review Comment:
only for test, this method can delete
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -924,6 +925,32 @@ public CompletableFuture<ManagedLedger>
getStoreManageLedger() {
}
}
+ @Override
+ public PositionInPendingAckStats
checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+ if
(persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null
&& position.compareTo(
+ (PositionImpl)
persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+ return new
PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+ } else if (individualAckPositions == null) {
+ return new
PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+ }
+ MutablePair<PositionImpl, Integer> positionIntegerMutablePair =
individualAckPositions.get(position);
+ if (positionIntegerMutablePair != null) {
+ if (batchIndex == null) {
+ return new
PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+ } else {
+ BitSetRecyclable bitSetRecyclable = BitSetRecyclable
+ .valueOf(positionIntegerMutablePair.left.getAckSet());
+ if (bitSetRecyclable.get(batchIndex)) {
Review Comment:
0 is in pendingAck, 1 is NotInPendingAck
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -924,6 +925,32 @@ public CompletableFuture<ManagedLedger>
getStoreManageLedger() {
}
}
+ @Override
+ public PositionInPendingAckStats
checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+ if
(persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null
&& position.compareTo(
+ (PositionImpl)
persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+ return new
PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+ } else if (individualAckPositions == null) {
Review Comment:
should check pending ack state
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -924,6 +925,32 @@ public CompletableFuture<ManagedLedger>
getStoreManageLedger() {
}
}
+ @Override
+ public PositionInPendingAckStats
checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+ if
(persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null
&& position.compareTo(
+ (PositionImpl)
persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+ return new
PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+ } else if (individualAckPositions == null) {
+ return new
PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+ }
+ MutablePair<PositionImpl, Integer> positionIntegerMutablePair =
individualAckPositions.get(position);
+ if (positionIntegerMutablePair != null) {
+ if (batchIndex == null) {
+ return new
PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+ } else {
+ BitSetRecyclable bitSetRecyclable = BitSetRecyclable
Review Comment:
should recycle
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final
AsyncResponse asyncRes
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
+
+ @GET
+
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+ @ApiOperation(value = "Get position stats in pending ack.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic "
+ + "or subscription name doesn't exist"),
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 307, message = "Topic is not owned by this
broker!"),
+ @ApiResponse(code = 405, message = "Pending ack handle don't use
managedLedger!"),
+ @ApiResponse(code = 400, message = "Topic is not a persistent
topic!"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getPositionStatsInPendingAck(@Suspended final AsyncResponse
asyncResponse,
+ @QueryParam("authoritative")
+ @DefaultValue("false") boolean
authoritative,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace,
+ @PathParam("topic") @Encoded
String encodedTopic,
+ @PathParam("subName") String
subName,
+ @QueryParam("position") String
position) {
+ try {
+ checkArgument(position != null, "Message position should not be
null.");
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ String[] args = position.split(":");
+ int length = args.length;
+ Integer batchIndex = args[2].equals("null") ? null :
Integer.parseInt(args[2]);
Review Comment:
args[2] == null? check this in internalGetPositionStatsPendingAckStats
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]