RobertIndie commented on code in PR #21853:
URL: https://github.com/apache/pulsar/pull/21853#discussion_r1442678052
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -105,7 +108,62 @@ public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
- client.getLookup().getTopicsUnderNamespace(namespaceName,
subscriptionMode, topicsPattern.pattern(), topicsHash)
+ asyncRecheckTopicsChange().exceptionally(ex -> {
+ log.warn("[{}] Failed to recheck topics change: {}", topic,
ex.getMessage());
+ return null;
+ }).thenAccept(__ -> {
+ // schedule the next re-check task
+ this.recheckPatternTimeout = client.timer()
+ .newTimeout(PatternMultiTopicsConsumerImpl.this,
+ Math.max(1, conf.getPatternAutoDiscoveryPeriod()),
TimeUnit.SECONDS);
+ });
+ }
+
+ private void recheckTopicsChangeRetryIfFailed() {
+ recheckTopicsChangeRetryIfFailed(null);
+ }
+
+ private void recheckTopicsChangeRetryIfFailed(Timeout retryTask) {
+ // This method will be called by A New Call or Timeout scheduled call.
+ final boolean isNew = (retryTask == null);
+ // Skip if closed or the task has been cancelled.
+ if (getState() == State.Closing || getState() == State.Closed
+ || (retryTask != null && retryTask.isCancelled())) {
+ retryRecheckPatternTask.compareAndSet(retryTask, null);
+ return;
+ }
+ // Skip the new check if contains a retry task.
+ Timeout pendingRetryTask = retryRecheckPatternTask.get();
+ if (isNew && pendingRetryTask != null) {
+ return;
+ }
+ // Do check.
+ asyncRecheckTopicsChange().whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to recheck topics change: {}", topic,
ex.getMessage());
+ long delayMs = retryRecheckPatternTaskBackoff.next();
+ Timeout newTask = client.timer().newTimeout(timeout -> {
+ if (timeout.cancel()) {
+ return;
+ }
+ recheckTopicsChangeRetryIfFailed();
Review Comment:
```suggestion
recheckTopicsChangeRetryIfFailed(newTask);
```
But it's weird to pass a Timeout Task to recheckTopicsChangeRetryIfFailed
when retrying. Maybe we should just pass a boolean to indicate it if it's a new
Task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]