congbobo184 commented on code in PR #15682:
URL: https://github.com/apache/pulsar/pull/15682#discussion_r887626615


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final 
AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this 
broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use 
managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent 
topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse 
asyncResponse,
+                                               @QueryParam("authoritative")
+                                               @DefaultValue("false") boolean 
authoritative,
+                                               @PathParam("tenant") String 
tenant,
+                                               @PathParam("namespace") String 
namespace,
+                                               @PathParam("topic") @Encoded 
String encodedTopic,
+                                               @PathParam("subName") String 
subName,
+                                               @QueryParam("position") String 
position) {
+        try {
+            checkArgument(position != null, "Message position should not be 
null.");
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            PositionImpl positionImpl = 
PositionImpl.parsePositionFromString(position);
+            internalGetPositionStatsPendingAckStats(authoritative, subName, 
positionImpl)
+                    .thenAccept(positionInPendingAckStats -> 
asyncResponse.resume(positionInPendingAckStats))
+                    .exceptionally(ex -> {
+                        log.warn("{} Failed to check position [{}] stats for 
topic [{}], subscription [{}]",
+                                clientAppId(), position, topicName, subName, 
ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception ex) {
+            log.warn("Failed to get position stats in pending ack");
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
+
+
+

Review Comment:
   delete



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final 
AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this 
broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use 
managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent 
topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse 
asyncResponse,

Review Comment:
   checkPositionInPendingAckState



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java:
##########
@@ -317,10 +318,10 @@ public 
CompletableFuture<TransactionPendingAckInternalStats> getPendingAckIntern
         path = path.queryParam("metadata", metadata);
         final CompletableFuture<TransactionPendingAckInternalStats> future = 
new CompletableFuture<>();
         asyncGetRequest(path,
-                new InvocationCallback<TransactionPendingAckInternalStats>() {
+                new InvocationCallback<PositionInPendingAckStats>() {

Review Comment:
   why change this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to