oneby-wang commented on code in PR #25127:
URL: https://github.com/apache/pulsar/pull/25127#discussion_r2710604267
##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -1591,6 +1604,94 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture<AnalyzeSubscriptionBacklogResult>
analyzeSubscriptionBacklogAsync(String topic,
+
String subscriptionName,
+
Optional<MessageId> startPosition,
+
long backlogScanMaxEntries) {
+ final CompletableFuture<AnalyzeSubscriptionBacklogResult> future = new
CompletableFuture<>();
+ AtomicReference<AnalyzeSubscriptionBacklogResult> resultRef = new
AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference<Optional<MessageId>> startPositionRef = new
AtomicReference<>(startPosition);
+
+ Supplier<CompletableFuture<AnalyzeSubscriptionBacklogResult>>
resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName,
startPositionRef.get());
+ BiConsumer<AnalyzeSubscriptionBacklogResult, Throwable> completeAction
= new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult,
Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult =
mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >=
backlogScanMaxEntries) {
Review Comment:
> the !mergedResult.isAborted() part isn't correct.
In `PersistentTopicsBase`, we convert `rawResult.getScanOutcome() !=
ScanOutcome.COMPLETED` to `boolean aborted`, and the orginal `scanOutcome`
result is not returned back to the client, so I used `!aborted` to replace
`scanOutcome == ScanOutcome.COMPLETED`.
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L1704-L1709
`OpScan` would only completes with `ScanOutcome.COMPLETED` if there is no
more entries to scan.
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java#L97-L101
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java#L129-L135
Only this if block returns the result of ScanOutcome.USER_INTERRUPTED.
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java#L84-L88
In `analyzeBacklog()` method, we always return `true`, so the `scanOutcome`
is `ScanOutcome.COMPLETED` or `ScanOutcome.ABORTED`.
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L657-L710
Should I add a field in `AnalyzeSubscriptionBacklogResult` to return the
orginal `scanOutcome` result?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]