[
https://issues.apache.org/jira/browse/STORM-166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13962121#comment-13962121
]
ASF GitHub Bot commented on STORM-166:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/61#discussion_r11358591
--- Diff: storm-core/src/jvm/backtype/storm/nimbus/NimbusLeadership.java ---
@@ -0,0 +1,104 @@
+package backtype.storm.nimbus;
+
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.recipes.locks.InterProcessMutex;
+import com.netflix.curator.utils.ZKPaths;
+
+@SuppressWarnings("rawtypes")
+public class NimbusLeadership {
+
+ private static final String STORM_NIMBUS_LEADERSHIP_PATH =
"/nimbus/leadership";
+
+ private Map conf;
+ private CuratorFramework curator;
+ private InterProcessMutex mutex;
+ private boolean isLeader = false;
+
+ public NimbusLeadership(final Map conf) {
+ this.conf = conf;
+ }
+
+ public void acquireLeaderShip() throws Exception {
+ String nimbusHostName =
InetAddress.getLocalHost().getCanonicalHostName();
+ Object nimbusPort = conf.get(Config.NIMBUS_THRIFT_PORT);
+ String nodeId = nimbusHostName + ":" + nimbusPort.toString();
+ initCurator();
+ initLeadershipMutex(nodeId);
+ mutex.acquire();
+ isLeader = true;
+ }
+
+ public InetSocketAddress getNimbusLeaderAddress() throws Exception {
+ InetSocketAddress leaderAddress = null;
+ initCurator();
+ initLeadershipMutex(null);
+ Collection<String> nimbusNodesPath =
mutex.getParticipantNodes();
+ if (nimbusNodesPath.size() > 0) {
+ leaderAddress =
parseAddress(nimbusNodesPath.iterator().next());
+ }
+ close();
+ return leaderAddress;
+ }
+
+ public List<InetSocketAddress> getNimbusHosts() throws Exception {
+ List<InetSocketAddress> nimbusAddressList = new
ArrayList<InetSocketAddress>();
+ initCurator();
+ initLeadershipMutex(null);
+ Collection<String> nimbusNodesPath =
mutex.getParticipantNodes();
+ for (String nimbusNodePath : nimbusNodesPath) {
+ nimbusAddressList.add(parseAddress(nimbusNodePath));
+ }
+ close();
+ return nimbusAddressList;
+ }
+
+ public void close() {
--- End diff --
The spacing appears to be off here compared to the rest of the file.
> Highly available Nimbus
> -----------------------
>
> Key: STORM-166
> URL: https://issues.apache.org/jira/browse/STORM-166
> Project: Apache Storm (Incubating)
> Issue Type: New Feature
> Reporter: James Xu
> Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/360
> The goal of this feature is to be able to run multiple Nimbus servers so that
> if one goes down another one will transparently take over. Here's what needs
> to happen to implement this:
> 1. Everything currently stored on local disk on Nimbus needs to be stored in
> a distributed and reliable fashion. A DFS is perfect for this. However, as we
> do not want to make a DFS a mandatory requirement to run Storm, the storage
> of these artifacts should be pluggable (default to local filesystem, but the
> interface should support DFS). You would only be able to run multiple NImbus
> if you use the right storage, and the storage interface chosen should have a
> flag indicating whether it's suitable for HA mode or not. If you choose local
> storage and try to run multiple Nimbus, one of the Nimbus's should fail to
> launch.
> 2. Nimbus's should register themselves in Zookeeper. They should use a leader
> election protocol to decide which one is currently responsible for launching
> and monitoring topologies.
> 3. StormSubmitter should find the Nimbus to connect to via Zookeeper. In case
> the leader changes during submission, it should use a retry protocol to try
> reconnecting to the new leader and attempting submission again.
--
This message was sent by Atlassian JIRA
(v6.2#6252)