This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch east_example_akka_cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ed717028d19f40646eb83bd7a8a4a2ea3c597d66
Author: mdf369 <[email protected]>
AuthorDate: Fri Mar 1 14:15:50 2019 +0800

    create SimpleIoTDBCluster
---
 iotdb/pom.xml                                      | 10 +++++
 .../apache/iotdb/db/akka/SimpleIoTDBCluster.java   | 36 +++++++++++++++
 .../org/apache/iotdb/db/akka/SimpleIoTDBNode.java  | 52 ++++++++++++++++++++++
 iotdb/src/main/resources/application.conf          | 38 ++++++++++++++++
 4 files changed, 136 insertions(+)

diff --git a/iotdb/pom.xml b/iotdb/pom.xml
index 7ff37e9..71cd609 100644
--- a/iotdb/pom.xml
+++ b/iotdb/pom.xml
@@ -70,6 +70,16 @@
             <artifactId>commons-lang3</artifactId>
             <version>${common.lang3.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster-tools_2.12</artifactId>
+            <version>2.5.20</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster-metrics_2.12</artifactId>
+            <version>2.5.20</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBCluster.java 
b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBCluster.java
new file mode 100644
index 0000000..0b5abf4
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBCluster.java
@@ -0,0 +1,36 @@
+package org.apache.iotdb.db.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class SimpleIoTDBCluster {
+
+  public static void main(String[] args) {
+    if (args.length == 0) {
+      startup(new String[] { "2551", "2552", "0" });
+    } else {
+      startup(args);
+    }
+  }
+
+  public static void startup(String[] ports) {
+    for (String port : ports) {
+      // Override the configuration of the port
+      // To use artery instead of netty, change to 
"akka.remote.artery.canonical.port"
+      // See https://doc.akka.io/docs/akka/current/remoting-artery.html for 
details
+      Config config = ConfigFactory.parseString(
+          "akka.remote.netty.tcp.port=" + port)
+          .withFallback(ConfigFactory.load());
+
+      // Create an Akka system
+      ActorSystem system = ActorSystem.create("ClusterSystem", config);
+
+      // Create an actor that handles cluster domain events
+      system.actorOf(Props.create(SimpleIoTDBNode.class),
+          "clusterListener");
+
+    }
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBNode.java 
b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBNode.java
new file mode 100644
index 0000000..a3c35be
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBNode.java
@@ -0,0 +1,52 @@
+package org.apache.iotdb.db.akka;
+
+import akka.actor.AbstractActor;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberEvent;
+import akka.cluster.ClusterEvent.MemberRemoved;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.cluster.ClusterEvent.UnreachableMember;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.service.IoTDB;
+
+public class SimpleIoTDBNode extends AbstractActor {
+  LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+  Cluster cluster = Cluster.get(getContext().system());
+  IoTDB ioTDB = IoTDB.getInstance();
+
+  //subscribe to cluster changes
+  @Override
+  public void preStart() {
+    cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
+        MemberEvent.class, UnreachableMember.class);
+    ioTDB.active();
+  }
+
+  //re-subscribe when restart
+  @Override
+  public void postStop() throws FileNodeManagerException {
+    cluster.unsubscribe(self());
+    ioTDB.stop();
+  }
+
+  @Override
+  public Receive createReceive() {
+    return receiveBuilder()
+      .match(MemberUp.class, mUp -> {
+        log.info("Member is Up: {}", mUp.member());
+      })
+      .match(UnreachableMember.class, mUnreachable -> {
+        log.info("Member detected as unreachable: {}", mUnreachable.member());
+      })
+      .match(MemberRemoved.class, mRemoved -> {
+        log.info("Member is Removed: {}", mRemoved.member());
+      })
+      .match(MemberEvent.class, message -> {
+        // ignore
+      })
+      .build();
+  }
+}
diff --git a/iotdb/src/main/resources/application.conf 
b/iotdb/src/main/resources/application.conf
new file mode 100644
index 0000000..2a2ac74
--- /dev/null
+++ b/iotdb/src/main/resources/application.conf
@@ -0,0 +1,38 @@
+akka {
+  actor {
+    provider = "cluster"
+  }
+  remote {
+    netty.tcp {
+      hostname = "127.0.0.1"
+      port = 0
+    }
+
+    artery {
+      # change this to enabled=on to use Artery instead of netty
+      # see https://doc.akka.io/docs/akka/current/remoting-artery.html
+      enabled = off
+      transport = tcp
+      canonical.hostname = "127.0.0.1"
+      canonical.port = 0
+    }
+  }
+
+  cluster {
+    # Note - Artery uses akka:// addresses
+    seed-nodes = [
+      "akka.tcp://[email protected]:2551",
+      "akka.tcp://[email protected]:2552"]
+
+    # auto downing is NOT safe for production deployments.
+    # you may want to use it during development, read more about it in the 
docs.
+    auto-down-unreachable-after = 10s
+  }
+}
+
+# Enable metrics extension in akka-cluster-metrics.
+akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
+
+# Sigar native library extract location during tests.
+# Note: use per-jvm-instance folder when running multiple jvm on one host. 
+akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

Reply via email to