This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ccfb949 When the loadmanager leader is not available, fall through
regular least loaded selection (#3688)
ccfb949 is described below
commit ccfb94970bd26d7e9b4f3c819bba258f148773e4
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Feb 28 21:53:04 2019 -0800
When the loadmanager leader is not available, fall through regular least
loaded selection (#3688)
* When the loadmanager leader is not available, fall through regular least
loaded selection
* Handle exceptions coming from mock zk in tests
---
.../broker/loadbalance/LeaderElectionService.java | 17 +++++++++++++++++
.../pulsar/broker/namespace/NamespaceService.java | 11 ++++++++---
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index 799c7af..712b234 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
@@ -179,6 +180,22 @@ public class LeaderElectionService {
if (stopped) {
return;
}
+
+ if (isLeader()) {
+ // Make sure to remove the leader election z-node in case the
session doesn't
+ // get closed properly. This is to avoid having to wait the
session timeout
+ // to elect a new one.
+ // This delete operation is safe to do here (with version=-1)
because either:
+ // 1. The ZK session is still valid, in which case this broker is
still
+ // the "leader" and we have to remove the z-node
+ // 2. The session has already expired, in which case this delete
operation
+ // will not go through
+ try {
+ pulsar.getLocalZkCache().getZooKeeper().delete(ELECTION_ROOT,
-1);
+ } catch (Throwable t) {
+ log.warn("Failed to cleanup election root znode: {}", t);
+ }
+ }
stopped = true;
log.info("LeaderElectionService stopped");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 9b4fcae..17fa025 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -373,7 +373,12 @@ public class NamespaceService {
}
if (candidateBroker == null) {
- if (!this.loadManager.get().isCentralized() ||
pulsar.getLeaderElectionService().isLeader()) {
+ if (!this.loadManager.get().isCentralized()
+ || pulsar.getLeaderElectionService().isLeader()
+
+ // If leader is not active, fallback to pick the least
loaded from current broker loadmanager
+ ||
!isBrokerActive(pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl())
+ ) {
Optional<String> availableBroker =
getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
lookupFuture.complete(Optional.empty());
@@ -977,7 +982,7 @@ public class NamespaceService {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
- }
+ }
return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(),
host, port);
}
public static String getSLAMonitorNamespace(String host,
ServiceConfiguration config) {
@@ -986,7 +991,7 @@ public class NamespaceService {
port = config.getWebServicePort().get();
} else if (config.getWebServicePortTls().isPresent()) {
port = config.getWebServicePortTls().get();
- }
+ }
return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host,
port);
}