This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 445209752a3514d2191b1a1272ed0f16d197007a Author: Kai Wang <[email protected]> AuthorDate: Wed Sep 4 19:53:54 2024 +0800 [improve][broker] Add retry for start service unit state channel (ExtensibleLoadManagerImpl only) (#23230) (cherry picked from commit 8bb30a1106e8bbe5a76c14932a59805a278b9dd4) --- .../extensions/ExtensibleLoadManagerImpl.java | 59 +++++++++++++++++++--- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 95882cfb21b..40efa6390a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -99,7 +99,10 @@ import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -122,6 +125,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; + public static final int STARTUP_TIMEOUT_SECONDS = 30; + + public static final int MAX_RETRY = 5; + private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; public static final Set<String> INTERNAL_TOPICS = @@ -401,10 +408,43 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); pulsar.runWhenReadyForIncomingRequests(() -> { - try { - this.serviceUnitStateChannel.start(); - } catch (Exception e) { - failStarting(e); + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .create(); + int retry = 0; + while (!Thread.currentThread().isInterrupted()) { + try { + brokerRegistry.register(); + this.serviceUnitStateChannel.start(); + break; + } catch (Exception e) { + log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", + pulsar.getBrokerId(), ++retry, e); + try { + Thread.sleep(backoff.next()); + } catch (InterruptedException ex) { + log.warn("Interrupted while sleeping."); + // preserve thread's interrupt status + Thread.currentThread().interrupt(); + try { + pulsar.close(); + } catch (PulsarServerException exc) { + log.error("Failed to close pulsar service.", exc); + } + return; + } + failStarting(e); + if (retry >= MAX_RETRY) { + log.error("Failed to start the service unit state channel after retry {} th. " + + "Closing pulsar service.", retry, e); + try { + pulsar.close(); + } catch (PulsarServerException ex) { + log.error("Failed to close pulsar service.", ex); + } + } + } } }); this.antiAffinityGroupPolicyHelper = @@ -498,8 +538,15 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS this.brokerRegistry, ex); if (this.brokerRegistry != null) { try { - brokerRegistry.close(); - } catch (PulsarServerException e) { + brokerRegistry.unregister(); + } catch (MetadataStoreException e) { + // ignore + } + } + if (this.serviceUnitStateChannel != null) { + try { + serviceUnitStateChannel.close(); + } catch (IOException e) { // ignore } }
