congbobo184 commented on code in PR #15682:
URL: https://github.com/apache/pulsar/pull/15682#discussion_r890765839


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java:
##########
@@ -138,4 +138,33 @@ public boolean hasAckSet() {
     public PositionInfo getPositionInfo() {
         return 
PositionInfo.newBuilder().setLedgerId(ledgerId).setEntryId(entryId).build();
     }
+
+    public String getAckSetAsString() {

Review Comment:
   only for test, this method can delete



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -924,6 +925,32 @@ public CompletableFuture<ManagedLedger> 
getStoreManageLedger() {
         }
     }
 
+    @Override
+    public PositionInPendingAckStats 
checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+        if 
(persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null 
&& position.compareTo(
+                        (PositionImpl) 
persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+            return new 
PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+        } else if (individualAckPositions == null) {
+            return new 
PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+        }
+        MutablePair<PositionImpl, Integer> positionIntegerMutablePair = 
individualAckPositions.get(position);
+        if (positionIntegerMutablePair != null) {
+            if (batchIndex == null) {
+                return new 
PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+            } else {
+                BitSetRecyclable bitSetRecyclable = BitSetRecyclable
+                        .valueOf(positionIntegerMutablePair.left.getAckSet());
+                if (bitSetRecyclable.get(batchIndex)) {

Review Comment:
   0 is in pendingAck, 1 is NotInPendingAck



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -924,6 +925,32 @@ public CompletableFuture<ManagedLedger> 
getStoreManageLedger() {
         }
     }
 
+    @Override
+    public PositionInPendingAckStats 
checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+        if 
(persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null 
&& position.compareTo(
+                        (PositionImpl) 
persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+            return new 
PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+        } else if (individualAckPositions == null) {

Review Comment:
   should check pending ack state



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -924,6 +925,32 @@ public CompletableFuture<ManagedLedger> 
getStoreManageLedger() {
         }
     }
 
+    @Override
+    public PositionInPendingAckStats 
checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+        if 
(persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null 
&& position.compareTo(
+                        (PositionImpl) 
persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+            return new 
PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+        } else if (individualAckPositions == null) {
+            return new 
PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+        }
+        MutablePair<PositionImpl, Integer> positionIntegerMutablePair = 
individualAckPositions.get(position);
+        if (positionIntegerMutablePair != null) {
+            if (batchIndex == null) {
+                return new 
PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+            } else {
+                BitSetRecyclable bitSetRecyclable = BitSetRecyclable

Review Comment:
   should recycle



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final 
AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this 
broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use 
managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent 
topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse 
asyncResponse,
+                                               @QueryParam("authoritative")
+                                               @DefaultValue("false") boolean 
authoritative,
+                                               @PathParam("tenant") String 
tenant,
+                                               @PathParam("namespace") String 
namespace,
+                                               @PathParam("topic") @Encoded 
String encodedTopic,
+                                               @PathParam("subName") String 
subName,
+                                               @QueryParam("position") String 
position) {
+        try {
+            checkArgument(position != null, "Message position should not be 
null.");
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            String[] args = position.split(":");
+            int length = args.length;
+            Integer batchIndex = args[2].equals("null") ? null : 
Integer.parseInt(args[2]);

Review Comment:
    args[2] == null? check this in internalGetPositionStatsPendingAckStats



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to