poorbarcode commented on code in PR #21853:
URL: https://github.com/apache/pulsar/pull/21853#discussion_r1442706596
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java:
##########
@@ -105,7 +108,62 @@ public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
- client.getLookup().getTopicsUnderNamespace(namespaceName,
subscriptionMode, topicsPattern.pattern(), topicsHash)
+ asyncRecheckTopicsChange().exceptionally(ex -> {
+ log.warn("[{}] Failed to recheck topics change: {}", topic,
ex.getMessage());
+ return null;
+ }).thenAccept(__ -> {
+ // schedule the next re-check task
+ this.recheckPatternTimeout = client.timer()
+ .newTimeout(PatternMultiTopicsConsumerImpl.this,
+ Math.max(1, conf.getPatternAutoDiscoveryPeriod()),
TimeUnit.SECONDS);
+ });
+ }
+
+ private void recheckTopicsChangeRetryIfFailed() {
+ recheckTopicsChangeRetryIfFailed(null);
+ }
+
+ private void recheckTopicsChangeRetryIfFailed(Timeout retryTask) {
+ // This method will be called by A New Call or Timeout scheduled call.
+ final boolean isNew = (retryTask == null);
+ // Skip if closed or the task has been cancelled.
+ if (getState() == State.Closing || getState() == State.Closed
+ || (retryTask != null && retryTask.isCancelled())) {
+ retryRecheckPatternTask.compareAndSet(retryTask, null);
+ return;
+ }
+ // Skip the new check if contains a retry task.
+ Timeout pendingRetryTask = retryRecheckPatternTask.get();
+ if (isNew && pendingRetryTask != null) {
+ return;
+ }
+ // Do check.
+ asyncRecheckTopicsChange().whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Failed to recheck topics change: {}", topic,
ex.getMessage());
+ long delayMs = retryRecheckPatternTaskBackoff.next();
+ Timeout newTask = client.timer().newTimeout(timeout -> {
+ if (timeout.cancel()) {
+ return;
+ }
+ recheckTopicsChangeRetryIfFailed();
Review Comment:
> recheckTopicsChangeRetryIfFailed(newTask);
Fixed.
> But it's weird to pass a Timeout Task to recheckTopicsChangeRetryIfFailed
when retrying. Maybe we should just pass a boolean to indicate it if it's a new
Task.
Ah, this boolean `isNew` is just an additional function of the argument
`Timeout retryTask `. The more important things are below two:
- check if the current task was canceled before
- compare and set
I rewrote the comment, and it would be more easy to read now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]