This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8bb30a1106e [improve][broker] Add retry for start service unit state
channel (ExtensibleLoadManagerImpl only) (#23230)
8bb30a1106e is described below
commit 8bb30a1106e8bbe5a76c14932a59805a278b9dd4
Author: Kai Wang <[email protected]>
AuthorDate: Wed Sep 4 19:53:54 2024 +0800
[improve][broker] Add retry for start service unit state channel
(ExtensibleLoadManagerImpl only) (#23230)
---
.../extensions/ExtensibleLoadManagerImpl.java | 59 +++++++++++++++++++---
1 file changed, 53 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 95882cfb21b..40efa6390a7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -99,7 +99,10 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.Backoff;
+import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;
@@ -122,6 +125,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
+ public static final int STARTUP_TIMEOUT_SECONDS = 30;
+
+ public static final int MAX_RETRY = 5;
+
private static final String ELECTION_ROOT =
"/loadbalance/extension/leader";
public static final Set<String> INTERNAL_TOPICS =
@@ -401,10 +408,43 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
pulsar.runWhenReadyForIncomingRequests(() -> {
- try {
- this.serviceUnitStateChannel.start();
- } catch (Exception e) {
- failStarting(e);
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .create();
+ int retry = 0;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ brokerRegistry.register();
+ this.serviceUnitStateChannel.start();
+ break;
+ } catch (Exception e) {
+ log.warn("The broker:{} failed to start service unit
state channel. Retrying {} th ...",
+ pulsar.getBrokerId(), ++retry, e);
+ try {
+ Thread.sleep(backoff.next());
+ } catch (InterruptedException ex) {
+ log.warn("Interrupted while sleeping.");
+ // preserve thread's interrupt status
+ Thread.currentThread().interrupt();
+ try {
+ pulsar.close();
+ } catch (PulsarServerException exc) {
+ log.error("Failed to close pulsar service.",
exc);
+ }
+ return;
+ }
+ failStarting(e);
+ if (retry >= MAX_RETRY) {
+ log.error("Failed to start the service unit state
channel after retry {} th. "
+ + "Closing pulsar service.", retry, e);
+ try {
+ pulsar.close();
+ } catch (PulsarServerException ex) {
+ log.error("Failed to close pulsar service.",
ex);
+ }
+ }
+ }
}
});
this.antiAffinityGroupPolicyHelper =
@@ -498,8 +538,15 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
this.brokerRegistry, ex);
if (this.brokerRegistry != null) {
try {
- brokerRegistry.close();
- } catch (PulsarServerException e) {
+ brokerRegistry.unregister();
+ } catch (MetadataStoreException e) {
+ // ignore
+ }
+ }
+ if (this.serviceUnitStateChannel != null) {
+ try {
+ serviceUnitStateChannel.close();
+ } catch (IOException e) {
// ignore
}
}