This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 445209752a3514d2191b1a1272ed0f16d197007a
Author: Kai Wang <[email protected]>
AuthorDate: Wed Sep 4 19:53:54 2024 +0800

    [improve][broker] Add retry for start service unit state channel 
(ExtensibleLoadManagerImpl only) (#23230)
    
    (cherry picked from commit 8bb30a1106e8bbe5a76c14932a59805a278b9dd4)
---
 .../extensions/ExtensibleLoadManagerImpl.java      | 59 +++++++++++++++++++---
 1 file changed, 53 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 95882cfb21b..40efa6390a7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -99,7 +99,10 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.slf4j.Logger;
 
@@ -122,6 +125,10 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
 
+    public static final int STARTUP_TIMEOUT_SECONDS = 30;
+
+    public static final int MAX_RETRY = 5;
+
     private static final String ELECTION_ROOT = 
"/loadbalance/extension/leader";
 
     public static final Set<String> INTERNAL_TOPICS =
@@ -401,10 +408,43 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
             this.serviceUnitStateChannel.listen(splitManager);
             this.leaderElectionService.start();
             pulsar.runWhenReadyForIncomingRequests(() -> {
-                try {
-                    this.serviceUnitStateChannel.start();
-                } catch (Exception e) {
-                    failStarting(e);
+                Backoff backoff = new BackoffBuilder()
+                        .setInitialTime(100, TimeUnit.MILLISECONDS)
+                        .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                        .create();
+                int retry = 0;
+                while (!Thread.currentThread().isInterrupted()) {
+                    try {
+                        brokerRegistry.register();
+                        this.serviceUnitStateChannel.start();
+                        break;
+                    } catch (Exception e) {
+                        log.warn("The broker:{} failed to start service unit 
state channel. Retrying {} th ...",
+                                pulsar.getBrokerId(), ++retry, e);
+                        try {
+                            Thread.sleep(backoff.next());
+                        } catch (InterruptedException ex) {
+                            log.warn("Interrupted while sleeping.");
+                            // preserve thread's interrupt status
+                            Thread.currentThread().interrupt();
+                            try {
+                                pulsar.close();
+                            } catch (PulsarServerException exc) {
+                                log.error("Failed to close pulsar service.", 
exc);
+                            }
+                            return;
+                        }
+                        failStarting(e);
+                        if (retry >= MAX_RETRY) {
+                            log.error("Failed to start the service unit state 
channel after retry {} th. "
+                                    + "Closing pulsar service.", retry, e);
+                            try {
+                                pulsar.close();
+                            } catch (PulsarServerException ex) {
+                                log.error("Failed to close pulsar service.", 
ex);
+                            }
+                        }
+                    }
                 }
             });
             this.antiAffinityGroupPolicyHelper =
@@ -498,8 +538,15 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 this.brokerRegistry, ex);
         if (this.brokerRegistry != null) {
             try {
-                brokerRegistry.close();
-            } catch (PulsarServerException e) {
+                brokerRegistry.unregister();
+            } catch (MetadataStoreException e) {
+                // ignore
+            }
+        }
+        if (this.serviceUnitStateChannel != null) {
+            try {
+                serviceUnitStateChannel.close();
+            } catch (IOException e) {
                 // ignore
             }
         }

Reply via email to