gaoran10 commented on code in PR #15682:
URL: https://github.com/apache/pulsar/pull/15682#discussion_r905681190


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -342,4 +343,47 @@ public void scaleTransactionCoordinators(@Suspended final 
AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")

Review Comment:
   ```suggestion
       @Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}")
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -342,4 +343,47 @@ public void scaleTransactionCoordinators(@Suspended final 
AsyncResponse asyncRes
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this 
broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use 
managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent 
topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse 
asyncResponse,
+                                             @QueryParam("authoritative")
+                                             @DefaultValue("false") boolean 
authoritative,
+                                             @PathParam("tenant") String 
tenant,
+                                             @PathParam("namespace") String 
namespace,
+                                             @PathParam("topic") @Encoded 
String encodedTopic,
+                                             @PathParam("subName") String 
subName,
+                                             @QueryParam("ledgerId") Long 
ledgerId,

Review Comment:
   Maybe we could add the params `ledgerId` and `entryId` in the path, it's 
more rest style, and we don't need to validate them. 



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -613,6 +619,117 @@ public void testUpdateTransactionCoordinatorNumber() 
throws Exception {
         }
     }
 
+
+    @Test
+    public void testCheckPositionInPendingAckState() throws Exception {
+         String topic = "persistent://public/default/test";
+         String subName = "sub";
+         initTransaction(1);
+         Transaction transaction = pulsarClient.newTransaction()
+                 .withTransactionTimeout(5, TimeUnit.SECONDS)
+                 .build()
+                 .get();
+
+         @Cleanup
+         Producer<byte[]> producer = pulsarClient.newProducer()
+                 .sendTimeout(5, TimeUnit.SECONDS)
+                 .enableBatching(false)
+                 .topic(topic)
+                 .create();
+         @Cleanup
+         Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                 .topic(topic)
+                 .subscriptionName(subName)
+                 .subscribe();
+
+         producer.newMessage().send();
+
+         Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+         MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
+
+         PositionInPendingAckStats result = 
admin.transactions().checkPositionInPendingAckState(topic, subName,
+                 messageId.getLedgerId(), messageId.getEntryId(), null);
+        assertEquals(result.state, 
PositionInPendingAckStats.State.PendingAckNotReady);

Review Comment:
   ```suggestion
            assertEquals(result.state, 
PositionInPendingAckStats.State.PendingAckNotReady);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to