lhotari commented on code in PR #25188:
URL: https://github.com/apache/pulsar/pull/25188#discussion_r2737783770
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -136,46 +136,113 @@ public void run(Timeout timeout) throws Exception {
}
CompletableFuture<Void> recheckTopicsChange() {
- String pattern = topicsPattern.inputPattern();
- final int epoch = recheckPatternEpoch.incrementAndGet();
- return client.getLookup().getTopicsUnderNamespace(namespaceName,
subscriptionMode, pattern, topicsHash)
- .thenCompose(getTopicsResult -> {
- // If "recheckTopicsChange" has been called more than one
times, only make the last one take affects.
- // Use "synchronized (recheckPatternTaskBackoff)" instead of
- // `synchronized(PatternMultiTopicsConsumerImpl.this)` to
avoid locking in a wider range.
- synchronized (PatternMultiTopicsConsumerImpl.this) {
- if (recheckPatternEpoch.get() > epoch) {
- return CompletableFuture.completedFuture(null);
- }
- if (log.isDebugEnabled()) {
- log.debug("Pattern consumer [{}] get topics under
namespace {}, topics.size: {},"
- + " topicsHash: {}, filtered: {}",
-
PatternMultiTopicsConsumerImpl.this.getSubscription(),
- namespaceName,
getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
- getTopicsResult.isFiltered());
- getTopicsResult.getTopics().forEach(topicName ->
- log.debug("Get topics under namespace {},
topic: {}", namespaceName, topicName));
- }
+ final int epoch = getNextRecheckPatternEpoch();
- final List<String> oldTopics = new
ArrayList<>(getPartitions());
- return updateSubscriptions(topicsPattern,
this::setTopicsHash, getTopicsResult,
- topicsChangeListener, oldTopics, subscription);
+ CompletableFuture<Void> recheckFuture;
+ // Prefer watcher-based reconcile when a watcher exists and is
connected. Fallback to lookup if watcher
+ // is not available or the watcher-based request fails.
+ if (supportsTopicListWatcherReconcile()) {
+ String localStateTopicsHash = getLocalStateTopicsHash();
+ recheckFuture =
topicListWatcher.reconcile(localStateTopicsHash).thenCompose(response -> {
+ return handleWatchTopicListSuccess(response,
localStateTopicsHash, epoch);
+ }).handle((res, ex) -> {
+ if (ex != null) {
+ // watcher-based reconcile failed -> fall back to
lookup-based recheck
+ return doLookupBasedRecheck(epoch);
+ } else {
+ // watcher-based reconcile completed successfully
+ return CompletableFuture.<Void>completedFuture(null);
}
- }).thenAccept(__ -> {
- if (recheckPatternTimeout != null) {
- this.recheckPatternTimeout = client.timer().newTimeout(
- this, Math.max(1,
conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
- }
- });
+ }).thenCompose(Function.identity());
+ } else {
+ // Fallback: perform the existing lookup-based recheck
+ recheckFuture = doLookupBasedRecheck(epoch);
+ }
+
+ return recheckFuture.handle((__, ex) -> {
Review Comment:
adding logging.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]