This is an automated email from the ASF dual-hosted git repository. east pushed a commit to branch east_example_akka_cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit ed717028d19f40646eb83bd7a8a4a2ea3c597d66 Author: mdf369 <[email protected]> AuthorDate: Fri Mar 1 14:15:50 2019 +0800 create SimpleIoTDBCluster --- iotdb/pom.xml | 10 +++++ .../apache/iotdb/db/akka/SimpleIoTDBCluster.java | 36 +++++++++++++++ .../org/apache/iotdb/db/akka/SimpleIoTDBNode.java | 52 ++++++++++++++++++++++ iotdb/src/main/resources/application.conf | 38 ++++++++++++++++ 4 files changed, 136 insertions(+) diff --git a/iotdb/pom.xml b/iotdb/pom.xml index 7ff37e9..71cd609 100644 --- a/iotdb/pom.xml +++ b/iotdb/pom.xml @@ -70,6 +70,16 @@ <artifactId>commons-lang3</artifactId> <version>${common.lang3.version}</version> </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-tools_2.12</artifactId> + <version>2.5.20</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-metrics_2.12</artifactId> + <version>2.5.20</version> + </dependency> </dependencies> <build> <plugins> diff --git a/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBCluster.java b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBCluster.java new file mode 100644 index 0000000..0b5abf4 --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBCluster.java @@ -0,0 +1,36 @@ +package org.apache.iotdb.db.akka; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +public class SimpleIoTDBCluster { + + public static void main(String[] args) { + if (args.length == 0) { + startup(new String[] { "2551", "2552", "0" }); + } else { + startup(args); + } + } + + public static void startup(String[] ports) { + for (String port : ports) { + // Override the configuration of the port + // To use artery instead of netty, change to "akka.remote.artery.canonical.port" + // See https://doc.akka.io/docs/akka/current/remoting-artery.html for details + Config config = ConfigFactory.parseString( + "akka.remote.netty.tcp.port=" + port) + .withFallback(ConfigFactory.load()); + + // Create an Akka system + ActorSystem system = ActorSystem.create("ClusterSystem", config); + + // Create an actor that handles cluster domain events + system.actorOf(Props.create(SimpleIoTDBNode.class), + "clusterListener"); + + } + } +} diff --git a/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBNode.java b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBNode.java new file mode 100644 index 0000000..a3c35be --- /dev/null +++ b/iotdb/src/main/java/org/apache/iotdb/db/akka/SimpleIoTDBNode.java @@ -0,0 +1,52 @@ +package org.apache.iotdb.db.akka; + +import akka.actor.AbstractActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberEvent; +import akka.cluster.ClusterEvent.MemberRemoved; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.UnreachableMember; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import org.apache.iotdb.db.exception.FileNodeManagerException; +import org.apache.iotdb.db.service.IoTDB; + +public class SimpleIoTDBNode extends AbstractActor { + LoggingAdapter log = Logging.getLogger(getContext().system(), this); + Cluster cluster = Cluster.get(getContext().system()); + IoTDB ioTDB = IoTDB.getInstance(); + + //subscribe to cluster changes + @Override + public void preStart() { + cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(), + MemberEvent.class, UnreachableMember.class); + ioTDB.active(); + } + + //re-subscribe when restart + @Override + public void postStop() throws FileNodeManagerException { + cluster.unsubscribe(self()); + ioTDB.stop(); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(MemberUp.class, mUp -> { + log.info("Member is Up: {}", mUp.member()); + }) + .match(UnreachableMember.class, mUnreachable -> { + log.info("Member detected as unreachable: {}", mUnreachable.member()); + }) + .match(MemberRemoved.class, mRemoved -> { + log.info("Member is Removed: {}", mRemoved.member()); + }) + .match(MemberEvent.class, message -> { + // ignore + }) + .build(); + } +} diff --git a/iotdb/src/main/resources/application.conf b/iotdb/src/main/resources/application.conf new file mode 100644 index 0000000..2a2ac74 --- /dev/null +++ b/iotdb/src/main/resources/application.conf @@ -0,0 +1,38 @@ +akka { + actor { + provider = "cluster" + } + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + + artery { + # change this to enabled=on to use Artery instead of netty + # see https://doc.akka.io/docs/akka/current/remoting-artery.html + enabled = off + transport = tcp + canonical.hostname = "127.0.0.1" + canonical.port = 0 + } + } + + cluster { + # Note - Artery uses akka:// addresses + seed-nodes = [ + "akka.tcp://[email protected]:2551", + "akka.tcp://[email protected]:2552"] + + # auto downing is NOT safe for production deployments. + # you may want to use it during development, read more about it in the docs. + auto-down-unreachable-after = 10s + } +} + +# Enable metrics extension in akka-cluster-metrics. +akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"] + +# Sigar native library extract location during tests. +# Note: use per-jvm-instance folder when running multiple jvm on one host. +akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
