congbobo184 commented on code in PR #15682:
URL: https://github.com/apache/pulsar/pull/15682#discussion_r887626615
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final
AsyncResponse asyncRes
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
+
+ @GET
+
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+ @ApiOperation(value = "Get position stats in pending ack.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic "
+ + "or subscription name doesn't exist"),
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 307, message = "Topic is not owned by this
broker!"),
+ @ApiResponse(code = 405, message = "Pending ack handle don't use
managedLedger!"),
+ @ApiResponse(code = 400, message = "Topic is not a persistent
topic!"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getPositionStatsInPendingAck(@Suspended final AsyncResponse
asyncResponse,
+ @QueryParam("authoritative")
+ @DefaultValue("false") boolean
authoritative,
+ @PathParam("tenant") String
tenant,
+ @PathParam("namespace") String
namespace,
+ @PathParam("topic") @Encoded
String encodedTopic,
+ @PathParam("subName") String
subName,
+ @QueryParam("position") String
position) {
+ try {
+ checkArgument(position != null, "Message position should not be
null.");
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ PositionImpl positionImpl =
PositionImpl.parsePositionFromString(position);
+ internalGetPositionStatsPendingAckStats(authoritative, subName,
positionImpl)
+ .thenAccept(positionInPendingAckStats ->
asyncResponse.resume(positionInPendingAckStats))
+ .exceptionally(ex -> {
+ log.warn("{} Failed to check position [{}] stats for
topic [{}], subscription [{}]",
+ clientAppId(), position, topicName, subName,
ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ log.warn("Failed to get position stats in pending ack");
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+
+
+
Review Comment:
delete
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -339,4 +340,47 @@ public void scaleTransactionCoordinators(@Suspended final
AsyncResponse asyncRes
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
+
+ @GET
+
@Path("/getPositionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}")
+ @ApiOperation(value = "Get position stats in pending ack.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace
or topic "
+ + "or subscription name doesn't exist"),
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 307, message = "Topic is not owned by this
broker!"),
+ @ApiResponse(code = 405, message = "Pending ack handle don't use
managedLedger!"),
+ @ApiResponse(code = 400, message = "Topic is not a persistent
topic!"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getPositionStatsInPendingAck(@Suspended final AsyncResponse
asyncResponse,
Review Comment:
checkPositionInPendingAckState
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java:
##########
@@ -317,10 +318,10 @@ public
CompletableFuture<TransactionPendingAckInternalStats> getPendingAckIntern
path = path.queryParam("metadata", metadata);
final CompletableFuture<TransactionPendingAckInternalStats> future =
new CompletableFuture<>();
asyncGetRequest(path,
- new InvocationCallback<TransactionPendingAckInternalStats>() {
+ new InvocationCallback<PositionInPendingAckStats>() {
Review Comment:
why change this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]