Unique values test passes with new "actor system" module
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ee18950f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ee18950f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ee18950f Branch: refs/heads/release-2.1.1 Commit: ee18950f6e605dc114257b4678802ba80039e959 Parents: 97aec4d Author: Dave Johnson <[email protected]> Authored: Tue Jun 21 08:02:26 2016 -0400 Committer: Dave Johnson <[email protected]> Committed: Tue Jun 21 08:02:26 2016 -0400 ---------------------------------------------------------------------- stack/core/pom.xml | 821 ++++++++++--------- .../corepersistence/CpEntityManager.java | 8 +- .../corepersistence/CpEntityManagerFactory.java | 12 +- .../service/ApplicationServiceImpl.java | 8 +- stack/corepersistence/actorsystem/pom.xml | 99 +++ .../persistence/actorsystem/ActorSystemFig.java | 116 +++ .../actorsystem/ActorSystemManager.java | 39 + .../actorsystem/ActorSystemManagerImpl.java | 398 +++++++++ .../actorsystem/ActorSystemModule.java | 34 + .../persistence/actorsystem/ClientActor.java | 205 +++++ .../actorsystem/GuiceActorProducer.java | 46 ++ .../persistence/actorsystem/RouterProducer.java | 45 + .../src/main/resources/application.conf | 28 + .../src/main/resources/cluster-singleton.conf | 25 + .../actorsystem/ActorServiceServiceTest.java | 57 ++ stack/corepersistence/collection/pom.xml | 22 + .../collection/EntityCollectionManager.java | 6 - .../collection/guice/CollectionModule.java | 4 +- .../EntityCollectionManagerFactoryImpl.java | 5 + .../impl/EntityCollectionManagerImpl.java | 23 +- .../mvcc/stage/write/WriteCommit.java | 12 +- .../mvcc/stage/write/WriteUniqueVerify.java | 12 +- .../collection/uniquevalues/AkkaFig.java | 116 --- .../uniquevalues/ClusterSingletonRouter.java | 70 -- .../uniquevalues/GuiceActorProducer.java | 46 -- .../collection/uniquevalues/RequestActor.java | 186 ----- .../uniquevalues/ReservationCache.java | 3 +- .../uniquevalues/UniqueValueActor.java | 2 +- .../uniquevalues/UniqueValuesRouter.java | 70 ++ .../uniquevalues/UniqueValuesService.java | 20 +- .../uniquevalues/UniqueValuesServiceImpl.java | 442 ++-------- .../uniquevalues/UniqueValuesTableImpl.java | 9 +- .../src/main/resources/application.conf | 28 - .../src/main/resources/cluster-singleton.conf | 25 - .../collection/EntityCollectionManagerIT.java | 20 +- .../mvcc/stage/write/WriteUniqueVerifyIT.java | 19 +- .../mvcc/stage/write/WriteUniqueVerifyTest.java | 19 +- .../uniquevalues/UniqueValuesServiceTest.java | 38 +- stack/corepersistence/common/pom.xml | 17 +- stack/corepersistence/model/pom.xml | 16 + stack/corepersistence/pom.xml | 1 + stack/corepersistence/queryindex/pom.xml | 18 +- 42 files changed, 1835 insertions(+), 1355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/core/pom.xml ---------------------------------------------------------------------- diff --git a/stack/core/pom.xml b/stack/core/pom.xml index 090f46b..9c6497e 100644 --- a/stack/core/pom.xml +++ b/stack/core/pom.xml @@ -17,30 +17,30 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.usergrid</groupId> - <artifactId>usergrid</artifactId> - <version>2.1.1-SNAPSHOT</version> - <relativePath>../</relativePath> - </parent> - - <artifactId>usergrid-core</artifactId> - <name>Usergrid Core</name> - <description>Core services for Usergrid system.</description> - <packaging>jar</packaging> - - <reporting> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - </plugin> - </plugins> - </reporting> - - - <build> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.usergrid</groupId> + <artifactId>usergrid</artifactId> + <version>2.1.1-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + + <artifactId>usergrid-core</artifactId> + <name>Usergrid Core</name> + <description>Core services for Usergrid system.</description> + <packaging>jar</packaging> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + </plugins> + </reporting> + + + <build> <resources> <resource> @@ -55,390 +55,395 @@ </includes> </resource> </resources> - <testResources> - <testResource> - <directory>src/test/resources</directory> - <filtering>true</filtering> - <includes> - <include>**/*.yaml</include> - <include>**/*.properties</include> - <include>**/*.xml</include> - </includes> - </testResource> - <testResource> - <directory>src/test/resources</directory> - <filtering>true</filtering> - <includes> - <include>largeentity.json</include> - </includes> - </testResource> - </testResources> - - <plugins> - - <!-- - Do not need to configure surefire plugin here, parent POM configuration is sufficient. - --> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <id>test-jar-execution</id> - <phase>package</phase> - <goals> - <goal>test-jar</goal> - </goals> - <configuration> - <includes> - <include>**/org/apache/usergrid/**</include> - </includes> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - </build> - - <dependencies> - - - <!-- dependent on wiring guice and spring --> - <dependency> - <groupId>com.google.inject.extensions</groupId> - <artifactId>guice-spring</artifactId> - <version>4.0-beta5</version> - </dependency> - - <!-- Apache Dependencies --> - - <dependency> - <groupId>org.apache.cassandra</groupId> - <artifactId>cassandra-all</artifactId> - <!-- Exclude the old and problematic Snappy --> - <exclusions> - <exclusion> - <artifactId>snappy-java</artifactId> - <groupId>org.xerial.snappy</groupId> - </exclusion> - <exclusion> - <artifactId>antlr</artifactId> - <groupId>org.antlr</groupId> - </exclusion> - <exclusion> + <testResources> + <testResource> + <directory>src/test/resources</directory> + <filtering>true</filtering> + <includes> + <include>**/*.yaml</include> + <include>**/*.properties</include> + <include>**/*.xml</include> + </includes> + </testResource> + <testResource> + <directory>src/test/resources</directory> + <filtering>true</filtering> + <includes> + <include>largeentity.json</include> + </includes> + </testResource> + </testResources> + + <plugins> + + <!-- + Do not need to configure surefire plugin here, parent POM configuration is sufficient. + --> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>test-jar-execution</id> + <phase>package</phase> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <includes> + <include>**/org/apache/usergrid/**</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <dependencies> + + + <!-- dependent on wiring guice and spring --> + <dependency> + <groupId>com.google.inject.extensions</groupId> + <artifactId>guice-spring</artifactId> + <version>4.0-beta5</version> + </dependency> + + <!-- Apache Dependencies --> + + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-all</artifactId> + <!-- Exclude the old and problematic Snappy --> + <exclusions> + <exclusion> + <artifactId>snappy-java</artifactId> + <groupId>org.xerial.snappy</groupId> + </exclusion> + <exclusion> + <artifactId>antlr</artifactId> + <groupId>org.antlr</groupId> + </exclusion> + <exclusion> + <artifactId>netty</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-thrift</artifactId> + </dependency> + + <!-- Include the slighly newer and less problematic snappy --> + <dependency> + <artifactId>snappy-java</artifactId> + <groupId>org.xerial.snappy</groupId> + <version>1.0.5</version> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <dependency> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils</artifactId> + </dependency> + + <!-- SUN, Javax Package, and Other Commercial Dependencies --> + + <dependency> + <groupId>com.beust</groupId> + <artifactId>jcommander</artifactId> + </dependency> + + <dependency> + <groupId>com.sun.mail</groupId> + <artifactId>javax.mail</artifactId> + </dependency> + + <dependency> + <groupId>javax.activation</groupId> + <artifactId>activation</artifactId> + </dependency> + + <dependency> + <groupId>javax.persistence</groupId> + <artifactId>persistence-api</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.uuid</groupId> + <artifactId>java-uuid-generator</artifactId> + </dependency> + + <dependency> + <groupId>com.hazelcast</groupId> + <artifactId>hazelcast-all</artifactId> + </dependency> + + <dependency> + <groupId>com.netflix.curator</groupId> + <artifactId>curator-recipes</artifactId> + </dependency> + + <dependency> + <groupId>com.netflix.astyanax</groupId> + <artifactId>astyanax-recipes</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <!-- Codehaus, Spring and Other Org Dependencies --> + + <dependency> + <groupId>org.hectorclient</groupId> + <artifactId>hector-core</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-expression</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-aop</artifactId> + </dependency> + + <dependency> + <groupId>aopalliance</groupId> + <artifactId>aopalliance</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context-support</artifactId> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-web</artifactId> + </dependency> + + <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + </dependency> + + <dependency> + <groupId>org.jsoup</groupId> + <artifactId>jsoup</artifactId> + </dependency> + + <dependency> + <groupId>org.perf4j</groupId> + <artifactId>perf4j</artifactId> + </dependency> + + <dependency> + <groupId>org.aspectj</groupId> + <artifactId>aspectjweaver</artifactId> + </dependency> + + <dependency> + <groupId>org.aspectj</groupId> + <artifactId>aspectjrt</artifactId> + </dependency> + + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + </dependency> + + <dependency> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </dependency> + + <!-- + <dependency> + <groupId>org.jboss.netty</groupId> <artifactId>netty</artifactId> - <groupId>io.netty</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.cassandra</groupId> - <artifactId>cassandra-thrift</artifactId> - </dependency> - - <!-- Include the slighly newer and less problematic snappy --> - <dependency> - <artifactId>snappy-java</artifactId> - <groupId>org.xerial.snappy</groupId> - <version>1.0.5</version> - </dependency> - - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </dependency> - - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - </dependency> - - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </dependency> - - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </dependency> - - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </dependency> - - <dependency> - <groupId>commons-beanutils</groupId> - <artifactId>commons-beanutils</artifactId> - </dependency> - - <!-- SUN, Javax Package, and Other Commercial Dependencies --> - - <dependency> - <groupId>com.beust</groupId> - <artifactId>jcommander</artifactId> - </dependency> - - <dependency> - <groupId>com.sun.mail</groupId> - <artifactId>javax.mail</artifactId> - </dependency> - - <dependency> - <groupId>javax.activation</groupId> - <artifactId>activation</artifactId> - </dependency> - - <dependency> - <groupId>javax.persistence</groupId> - <artifactId>persistence-api</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.uuid</groupId> - <artifactId>java-uuid-generator</artifactId> - </dependency> - - <dependency> - <groupId>com.hazelcast</groupId> - <artifactId>hazelcast-all</artifactId> - </dependency> - - <dependency> - <groupId>com.netflix.curator</groupId> - <artifactId>curator-recipes</artifactId> - </dependency> - - <dependency> - <groupId>com.netflix.astyanax</groupId> - <artifactId>astyanax-recipes</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <!-- Codehaus, Spring and Other Org Dependencies --> - - <dependency> - <groupId>org.hectorclient</groupId> - <artifactId>hector-core</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-expression</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-aop</artifactId> - </dependency> - - <dependency> - <groupId>aopalliance</groupId> - <artifactId>aopalliance</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context-support</artifactId> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-web</artifactId> - </dependency> - - <dependency> - <groupId>org.yaml</groupId> - <artifactId>snakeyaml</artifactId> - </dependency> - - <dependency> - <groupId>org.jsoup</groupId> - <artifactId>jsoup</artifactId> - </dependency> - - <dependency> - <groupId>org.perf4j</groupId> - <artifactId>perf4j</artifactId> - </dependency> - - <dependency> - <groupId>org.aspectj</groupId> - <artifactId>aspectjweaver</artifactId> - </dependency> - - <dependency> - <groupId>org.aspectj</groupId> - <artifactId>aspectjrt</artifactId> - </dependency> - - <dependency> - <groupId>cglib</groupId> - <artifactId>cglib-nodep</artifactId> - </dependency> - - <dependency> - <groupId>jline</groupId> - <artifactId>jline</artifactId> - </dependency> - -<!-- - <dependency> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </dependency> ---> - - <!-- Test and Logging Dependencies --> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>usergrid-config</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <classifier>tests</classifier> - </dependency> - - <dependency> - <groupId>org.hectorclient</groupId> - <artifactId>hector-test</artifactId> - <scope>test</scope> - </dependency> - - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <version>${mockito.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.jvnet.mock-javamail</groupId> - <artifactId>mock-javamail</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>usergrid-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-test</artifactId> - <scope>test</scope> - </dependency> - - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>common</artifactId> - <version>2.1.1-SNAPSHOT</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - - <!-- Core Persistence deps --> - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>collection</artifactId> - <version>2.1.1-SNAPSHOT</version> - <type>jar</type> - <exclusions> - <exclusion> - <artifactId>antlr</artifactId> - <groupId>org.antlr</groupId> - </exclusion> - <exclusion> - <artifactId>antlr</artifactId> - <groupId>antlr</groupId> - </exclusion> - </exclusions> - </dependency> - - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>queryindex</artifactId> - <version>2.1.1-SNAPSHOT</version> - <type>jar</type> - </dependency> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>graph</artifactId> - <version>2.1.1-SNAPSHOT</version> - <type>jar</type> - </dependency> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>map</artifactId> - <version>2.1.1-SNAPSHOT</version> - <type>jar</type> - </dependency> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>cache</artifactId> - <version>2.1.1-SNAPSHOT</version> - </dependency> - - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>queue</artifactId> - <version>2.1.1-SNAPSHOT</version> - <type>jar</type> - </dependency> - - <dependency> - <groupId>com.codahale.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${metrics.version}</version> - </dependency> - - <dependency> - <groupId>com.codahale.metrics</groupId> - <artifactId>metrics-graphite</artifactId> - <version>${metrics.version}</version> - </dependency> - - </dependencies> - - <!-- - Do not need jacoco profile here because we do not override the surefire plugin in this POM - --> + </dependency> + --> + + <!-- Test and Logging Dependencies --> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>usergrid-config</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + + <dependency> + <groupId>org.hectorclient</groupId> + <artifactId>hector-test</artifactId> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.jvnet.mock-javamail</groupId> + <artifactId>mock-javamail</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>usergrid-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-test</artifactId> + <scope>test</scope> + </dependency> + + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + + <!-- Core Persistence deps --> + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>collection</artifactId> + <version>${project.version}</version> + <type>jar</type> + <exclusions> + <exclusion> + <artifactId>antlr</artifactId> + <groupId>org.antlr</groupId> + </exclusion> + <exclusion> + <artifactId>antlr</artifactId> + <groupId>antlr</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>queryindex</artifactId> + <version>${project.version}</version> + <type>jar</type> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>graph</artifactId> + <version>${project.version}</version> + <type>jar</type> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>map</artifactId> + <version>${project.version}</version> + <type>jar</type> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>cache</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>actorsystem</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>queue</artifactId> + <version>${project.version}</version> + <type>jar</type> + </dependency> + + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>${metrics.version}</version> + </dependency> + + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + <version>${metrics.version}</version> + </dependency> + + </dependencies> + + <!-- + Do not need jacoco profile here because we do not override the surefire plugin in this POM + --> </project> http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index b0d3f59..6c06104 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -37,13 +37,13 @@ import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.Query.Level; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.cassandra.*; import org.apache.usergrid.persistence.cassandra.util.TraceParticipant; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.FieldSet; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; -import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.*; @@ -105,7 +105,7 @@ public class CpEntityManager implements EntityManager { private final UUID applicationId; private final EntityManagerFig entityManagerFig; - private final AkkaFig akkaFig; + private final ActorSystemFig actorSystemFig; private Application application; @@ -171,7 +171,7 @@ public class CpEntityManager implements EntityManager { final AsyncEventService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, - final AkkaFig akkaFig, + final ActorSystemFig actorSystemFig, final EntityManagerFig entityManagerFig, final GraphManagerFactory graphManagerFactory, final CollectionService collectionService, @@ -180,7 +180,7 @@ public class CpEntityManager implements EntityManager { final UUID applicationId ) { this.entityManagerFig = entityManagerFig; - this.akkaFig = akkaFig; + this.actorSystemFig = actorSystemFig; Preconditions.checkNotNull( cass, "cass must not be null" ); Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index bc1b335..99bf1e0 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -35,13 +35,13 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; import org.apache.usergrid.locking.LockManager; import org.apache.usergrid.persistence.*; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.cassandra.CassandraService; import org.apache.usergrid.persistence.cassandra.CounterUtils; import org.apache.usergrid.persistence.cassandra.Setup; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; @@ -82,7 +82,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private static final Logger logger = LoggerFactory.getLogger( CpEntityManagerFactory.class ); private final EntityManagerFig entityManagerFig; - private final AkkaFig akkaFig; + private final ActorSystemFig actorSystemFig; private ApplicationContext applicationContext; @@ -125,7 +125,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.injector = injector; this.reIndexService = injector.getInstance(ReIndexService.class); this.entityManagerFig = injector.getInstance(EntityManagerFig.class); - this.akkaFig = injector.getInstance( AkkaFig.class ); + this.actorSystemFig = injector.getInstance( ActorSystemFig.class ); this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); this.indexService = injector.getInstance( AsyncEventService.class ); @@ -139,11 +139,11 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application logger.info("EntityManagerFactoring starting..."); - if ( akkaFig.getAkkaEnabled() ) { + if ( actorSystemFig.getAkkaEnabled() ) { try { logger.info("Akka cluster starting..."); this.uniqueValuesService = injector.getInstance( UniqueValuesService.class ); - this.uniqueValuesService.start(); + // TODO: this.uniqueValuesService.start(); } catch (Throwable t) { logger.error("Error starting Akka", t); throw t; @@ -360,7 +360,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application indexService, managerCache, metricsFactory, - akkaFig, + actorSystemFig, entityManagerFig, graphManagerFactory, collectionService, http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java index c6b3b15..ea16d8b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java @@ -28,10 +28,10 @@ import org.apache.usergrid.corepersistence.index.CollectionSettingsCacheFactory; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.Schema; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -61,7 +61,7 @@ public class ApplicationServiceImpl implements ApplicationService{ private final MapManagerFactory mapManagerFactory; private final GraphManagerFactory graphManagerFactory; private final CollectionSettingsCacheFactory collectionSettingsCacheFactory; - private final AkkaFig akkaFig; + private final ActorSystemFig actorSystemFig; @Inject @@ -72,7 +72,7 @@ public class ApplicationServiceImpl implements ApplicationService{ MapManagerFactory mapManagerFactory, GraphManagerFactory graphManagerFactory, CollectionSettingsCacheFactory collectionSettingsCacheFactory, - AkkaFig akkaFig + ActorSystemFig actorSystemFig ){ this.allEntityIdsObservable = allEntityIdsObservable; @@ -82,7 +82,7 @@ public class ApplicationServiceImpl implements ApplicationService{ this.mapManagerFactory = mapManagerFactory; this.graphManagerFactory = graphManagerFactory; this.collectionSettingsCacheFactory = collectionSettingsCacheFactory; - this.akkaFig = akkaFig; + this.actorSystemFig = actorSystemFig; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/pom.xml b/stack/corepersistence/actorsystem/pom.xml new file mode 100644 index 0000000..85c0d60 --- /dev/null +++ b/stack/corepersistence/actorsystem/pom.xml @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>persistence</artifactId> + <groupId>org.apache.usergrid</groupId> + <version>2.1.1-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <name>Usergrid ActorSystem</name> + <artifactId>actorsystem</artifactId> + + <dependencies> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-tools_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-metrics_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-slf4j_2.11</artifactId> + <version>2.4.0</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + <version>4.0</version> + </dependency> + + <dependency> + <groupId>com.google.inject</groupId> + <artifactId>guice</artifactId> + <version>${guice.version}</version> + </dependency> + + <dependency> + <groupId>org.safehaus.guicyfig</groupId> + <artifactId>guicyfig</artifactId> + <version>${guicyfig.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java new file mode 100644 index 0000000..7af510c --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + +import java.io.Serializable; + +@FigSingleton +public interface ActorSystemFig extends GuicyFig, Serializable { + + String AKKA_ENABLED = "collection.akka.enabled"; + + String AKKA_HOSTNAME = "collection.akka.hostname"; + + String AKKA_PORT = "collection.akka.port"; + + String AKKA_REGION = "collection.akka.region"; + + String AKKA_REGION_LIST = "usergrid.queue.regionList"; // same region list used by queues + + String AKKA_REGION_SEEDS = "collection.akka.region.seeds"; + + String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors"; + + String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl"; + + String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl"; + + String AKKA_AUTHORITATIVE_REGION = "collection.akka.uniquevalue.authoritative.region"; + + /** + * Use Akka or nah + */ + @Key(AKKA_ENABLED) + @Default("true") + boolean getAkkaEnabled(); + + /** + * Hostname to be used in Akka configuration. + */ + @Key(AKKA_HOSTNAME) + String getHostname(); + + /** + * local port to be used in Akka configuration. + */ + @Key(AKKA_PORT) + int getPort(); + + /** + * Local region to be used in Akka configuration. + */ + @Key(AKKA_REGION) + String getRegion(); + + /** + * Comma separated list of regions known to cluster. + */ + @Key(AKKA_REGION_LIST) + String getRegionList(); + + /** + * Number of UniqueValueActors to be started on each node + */ + @Key(AKKA_UNIQUEVALUE_ACTORS) + @Default("300") + int getUniqueValueActors(); + + /** + * Comma-separated lists of seeds each with format {region}:{hostname}:{port}. + * Regions MUST be listed in the 'usergrid.queue.regionList' + */ + @Key(AKKA_REGION_SEEDS) + String getRegionSeeds(); + + /** + * If no region specified for type, use the authoritative region + */ + @Key(AKKA_AUTHORITATIVE_REGION) + String getAkkaAuthoritativeRegion(); + + /** + * Unique Value cache TTL in seconds. + */ + @Key(AKKA_UNIQUEVALUE_CACHE_TTL) + @Default("10") + int getUniqueValueCacheTtl(); + + /** + * Unique Value Reservation TTL in seconds. + */ + @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL) + @Default("10") + int getUniqueValueReservationTtl(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java new file mode 100644 index 0000000..e2c2913 --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + + +import akka.actor.ActorRef; + +public interface ActorSystemManager { + + void start(); + + void start(String hostname, Integer port, String currentRegion); + + void waitForRequestActors(); + + boolean isReady(); + + void registerRouterProducer( RouterProducer routerProducer ); + + void registerMessageType( Class messageType, String routerPath ); + + ActorRef getClientActor(String region ); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java new file mode 100644 index 0000000..0622b1b --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Singleton; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.*; +import java.util.concurrent.TimeUnit; + + +@Singleton +public class ActorSystemManagerImpl implements ActorSystemManager { + private static final Logger logger = LoggerFactory.getLogger( ActorSystemManagerImpl.class ); + + private String hostname; + private Integer port; + private String currentRegion; + + private static Injector injector; + private final ActorSystemFig actorSystemFig; + private final Map<String, ActorRef> requestActorsByRegion; + private final List<RouterProducer> routerProducers = new ArrayList<>(); + private final Map<Class, String> routersByMessageType = new HashMap<>(); + + + @Inject + public ActorSystemManagerImpl(Injector inj, ActorSystemFig actorSystemFig) { + injector = inj; + this.actorSystemFig = actorSystemFig; + this.requestActorsByRegion = new HashMap<>(); + } + + + /** + * Init Akka ActorSystems and wait for request actors to start. + */ + @Override + public void start() { + + this.hostname = actorSystemFig.getHostname(); + this.currentRegion = actorSystemFig.getRegion(); + this.port = null; + + initAkka(); + waitForRequestActors(); + } + + + /** + * For testing purposes only; does not wait for request actors to start. + */ + @Override + public void start(String hostname, Integer port, String currentRegion) { + + this.hostname = hostname; + this.currentRegion = currentRegion; + this.port = port; + + initAkka(); + } + + + @Override + public boolean isReady() { + return !getRequestActorsByRegion().isEmpty(); + } + + + @Override + public void registerRouterProducer(RouterProducer routerProducer) { + routerProducers.add( routerProducer ); + } + + + @Override + public void registerMessageType(Class messageType, String routerPath) { + routersByMessageType.put( messageType, routerPath ); + } + + + @Override + public ActorRef getClientActor(String region) { + return getRequestActorsByRegion().get( region ); + } + + + private Map<String, ActorRef> getRequestActorsByRegion() { + return requestActorsByRegion; + } + + + private void initAkka() { + logger.info("Initializing Akka"); + + // Create one actor system with request actor for each region + + if ( StringUtils.isEmpty( hostname )) { + throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_HOSTNAME ); + } + + if ( StringUtils.isEmpty( currentRegion )) { + throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION ); + } + + if ( StringUtils.isEmpty( actorSystemFig.getRegionList() )) { + throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_LIST ); + } + + if ( StringUtils.isEmpty( actorSystemFig.getRegionSeeds() )) { + throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_SEEDS); + } + + if ( StringUtils.isEmpty( actorSystemFig.getAkkaAuthoritativeRegion() )) { + throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_AUTHORITATIVE_REGION); + } + + List regionList = Arrays.asList( actorSystemFig.getRegionList().toLowerCase().split(",") ); + + logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}", + hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() ); + + final Map<String, ActorSystem> systemMap = new HashMap<>(); + + Map<String, Config> configMap = readClusterSingletonConfigs(); + + ActorSystem localSystem = createClusterSingletonProxies( configMap, systemMap ); + + createRequestActors( systemMap ); + + for ( RouterProducer routerProducer : routerProducers ) { + routerProducer.createLocalSystemActors( localSystem, systemMap ); + } + } + + + /** + * Read configuration and create a Config for each region. + * + * @return Map of regions to Configs. + */ + private Map<String, Config> readClusterSingletonConfigs() { + + Map<String, Config> configs = new HashMap<>(); + + ListMultimap<String, String> seedsByRegion = ArrayListMultimap.create(); + + String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," ); + + logger.info("Found region {} seeds {}", regionSeeds.length, regionSeeds); + + try { + + if ( port != null ) { + + // we are testing, create seeds-by-region map for one region, one seed + + String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + port; + seedsByRegion.put( currentRegion, seed ); + logger.info("Akka testing, only starting one seed"); + + } else { // create seeds-by-region map + + for (String regionSeed : regionSeeds) { + + String[] parts = regionSeed.split( ":" ); + String region = parts[0]; + String hostname = parts[1]; + String regionPortString = parts[2]; + + // all seeds in same region must use same port + // we assume 0th seed has the right port + final Integer regionPort; + + if (port == null) { + regionPort = Integer.parseInt( regionPortString ); + } else { + regionPort = port; // unless we are testing + } + + String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + regionPort; + + logger.info("Adding seed {} for region {}", seed, region ); + + seedsByRegion.put( region, seed ); + } + + if (seedsByRegion.keySet().isEmpty()) { + throw new RuntimeException( + "No seeds listed in 'parsing collection.akka.region.seeds' property." ); + } + } + + int numInstancesPerNode = actorSystemFig.getUniqueValueActors(); + + // read config file once for each region + + for ( String region : seedsByRegion.keySet() ) { + + List<String> seeds = seedsByRegion.get( region ); + int lastColon = seeds.get(0).lastIndexOf(":") + 1; + final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon )); + + // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions + String clusterRole = currentRegion.equals( region ) ? "io" : "client"; + + logger.info( "Config for region {} is:\n" + + " Akka Hostname {}\n" + + " Akka Seeds {}\n" + + " Akka Port {}\n" + + " Akka UniqueValueActors per node {}\n" + + " Akka Authoritative Region {}", + region, hostname, seeds, port, numInstancesPerNode, actorSystemFig.getAkkaAuthoritativeRegion() ); + + Map<String, Object> configMap = new HashMap<String, Object>() {{ + put( "akka", new HashMap<String, Object>() {{ + put( "remote", new HashMap<String, Object>() {{ + put( "netty.tcp", new HashMap<String, Object>() {{ + put( "hostname", hostname ); + put( "bind-hostname", hostname ); + put( "port", regionPort ); + }} ); + }} ); + put( "cluster", new HashMap<String, Object>() {{ + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + put( "roles", Collections.singletonList(clusterRole) ); + put( "seed-nodes", new ArrayList<String>() {{ + for (String seed : seeds) { + add( seed ); + } + }} ); + }} ); + put( "actor", new HashMap<String, Object>() {{ + put( "deployment", new HashMap<String, Object>() {{ + put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{ + put( "cluster", new HashMap<String, Object>() {{ + //put( "roles", Collections.singletonList(role) ); + put( "max-nr-of-instances-per-node", numInstancesPerNode ); + }} ); + }} ); + }} ); + }} ); + }} ); + }}; + + Config config = ConfigFactory + .parseMap( configMap ) + .withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]" ) ) + .withFallback( ConfigFactory.load( "cluster-singleton" ) ); + + configs.put( region, config ); + } + + } catch ( Exception e ) { + throw new RuntimeException("Error 'parsing collection.akka.region.seeds' property", e ); + } + + return configs; + } + + + /** + * Create ActorSystem and ClusterSingletonProxy for every region. + * Create ClusterSingletonManager for the current region. + * + * @param configMap Configurations to be used to create ActorSystems + * @param systemMap Map of ActorSystems created by this method + * + * @return ActorSystem for this region. + */ + private ActorSystem createClusterSingletonProxies( + Map<String, Config> configMap, Map<String, ActorSystem> systemMap ) { + + ActorSystem localSystem = null; + + for ( String region : configMap.keySet() ) { + Config config = configMap.get( region ); + + ActorSystem system = ActorSystem.create( "ClusterSystem", config ); + systemMap.put( region, system ); + + // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions + if ( currentRegion.equals( region ) ) { + + localSystem = system; + + for ( RouterProducer routerProducer : routerProducers ) { + routerProducer.createClusterSingletonManager( system ); + } + } + + for ( RouterProducer routerProducer : routerProducers ) { + routerProducer.createClusterSingletonProxy( system ); + } + } + + return localSystem; + } + + + /** + * Create RequestActor for each region. + * + * @param systemMap Map of regions to ActorSystems. + */ + private void createRequestActors( Map<String, ActorSystem> systemMap ) { + + for ( String region : systemMap.keySet() ) { + + logger.info("Creating request actor for region {}", region); + + // Each RequestActor needs to know path to ClusterSingletonProxy and region + ActorRef requestActor = systemMap.get( region ).actorOf( + //Props.create( ClientActor.class, "/user/uvProxy" ), "requestActor" ); + Props.create( ClientActor.class, routersByMessageType ), "requestActor" ); + + requestActorsByRegion.put( region, requestActor ); + } + } + + + @Override + public void waitForRequestActors() { + + for ( String region : requestActorsByRegion.keySet() ) { + ActorRef ra = requestActorsByRegion.get( region ); + waitForRequestActor( ra ); + } + } + + + private void waitForRequestActor( ActorRef ra ) { + + logger.info( "Waiting on request actor {}...", ra.path() ); + + boolean started = false; + int retries = 0; + int maxRetries = 60; + while (retries < maxRetries) { + Timeout t = new Timeout( 10, TimeUnit.SECONDS ); + + Future<Object> fut = Patterns.ask( ra, new ClientActor.StatusRequest(), t ); + try { + ClientActor.StatusMessage result = (ClientActor.StatusMessage) Await.result( fut, t.duration() ); + + if (result.getStatus().equals( ClientActor.StatusMessage.Status.READY )) { + started = true; + break; + } + logger.info( "Waiting for request actor {} region {} ({}s)", ra.path(), currentRegion, retries ); + Thread.sleep( 1000 ); + + } catch (Exception e) { + logger.error( "Error: Timeout waiting for requestActor" ); + } + retries++; + } + + if (started) { + logger.info( "RequestActor has started" ); + } else { + throw new RuntimeException( "RequestActor did not start in time" ); + } + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemModule.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemModule.java new file mode 100644 index 0000000..e501569 --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemModule.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + +import com.google.inject.AbstractModule; +import org.safehaus.guicyfig.GuicyFigModule; + + +public class ActorSystemModule extends AbstractModule { + + @Override + protected void configure() { + + install( new GuicyFigModule( ActorSystemFig.class ) ); + + bind( ActorSystemManager.class ).to( ActorSystemManagerImpl.class ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java new file mode 100644 index 0000000..c495608 --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClientActor.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + +import akka.actor.ActorSelection; +import akka.actor.Address; +import akka.actor.UntypedActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import org.apache.commons.lang.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; + + +/** + * Once notified of nodes, sends unique propertyValue requests to ClusterSingletonRouter via it's local proxy. + */ +public class ClientActor extends UntypedActor { + private static final Logger logger = LoggerFactory.getLogger( ClientActor.class ); + + private final String name = RandomStringUtils.randomAlphanumeric( 4 ); + + private final Set<Address> nodes = new HashSet<>(); + private final Cluster cluster = Cluster.get(getContext().system()); + private final Map<Class, String> routersByMessageType; + + private boolean ready = false; + + + public ClientActor( Map<Class, String> routersByMessageType ) { + this.routersByMessageType = routersByMessageType; + } + + // subscribe to cluster changes, MemberEvent + @Override + public void preStart() { + logger.debug("{} role {} address {}:{} starting up, subscribing to cluster events...", name, + cluster.getSelfRoles().iterator().next(), + cluster.readView().selfAddress().host(), + cluster.readView().selfAddress().hostPort()); + cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class, ClusterEvent.ReachabilityEvent.class); + } + + // re-subscribe when restart + @Override + public void postStop() { + cluster.unsubscribe(getSelf()); + } + + @Override + public void onReceive(Object message) { + + int startSize = nodes.size(); + + String routerPath = routersByMessageType.get( message.getClass() ); + + if ( routerPath != null && ready ) { + + // just pick any node, the ClusterSingletonRouter will do the consistent hash routing + List<Address> nodesList = new ArrayList<>( nodes ); + Address address = nodesList.get( ThreadLocalRandom.current().nextInt( nodesList.size() ) ); + ActorSelection service = getContext().actorSelection( address + routerPath ); + service.tell( message, getSender() ); + + } else if ( routerPath != null && !ready ) { + + logger.debug("{} responding with status unknown", name); + getSender().tell( new ErrorResponse("ClientActor not ready"), getSender() ); + + } if ( message instanceof StatusRequest ) { + if ( ready ) { + getSender().tell( new StatusMessage( name, StatusMessage.Status.READY ), getSender() ); + } else { + getSender().tell( new StatusMessage( name, StatusMessage.Status.INITIALIZING), getSender() ); + } + return; + + } else { + processAsClusterEvent( message ); + } + + if ( logger.isDebugEnabled() && startSize != nodes.size() ) { + logger.debug( "{} now knows {} nodes", name, nodes.size() ); + } + + if (!nodes.isEmpty() && !ready) { + logger.debug( name + " is ready" ); + ready = true; + + } else if (nodes.isEmpty() && ready) { + ready = false; + } + } + + /** + * Process messages about nodes up, down, reachable and unreachable. + */ + private void processAsClusterEvent(Object message) { + + if (message instanceof ClusterEvent.CurrentClusterState) { + ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message; + nodes.clear(); + for (Member member : state.getMembers()) { + if (member.hasRole("io") && member.status().equals( MemberStatus.up())) { + nodes.add(member.address()); + logger.debug("RequestActor {} received cluster-state member-up for {}", name, member.address()); + } + } + + } else if (message instanceof ClusterEvent.MemberUp) { + ClusterEvent.MemberUp mUp = (ClusterEvent.MemberUp) message; + if (mUp.member().hasRole("io")) { + nodes.add( mUp.member().address() ); + } + logger.debug("{} received member-up for {}", name, mUp.member().address()); + + } else if (message instanceof ClusterEvent.MemberEvent) { + ClusterEvent.MemberEvent other = (ClusterEvent.MemberEvent) message; + nodes.remove(other.member().address()); + + } else if (message instanceof ClusterEvent.UnreachableMember) { + ClusterEvent.UnreachableMember unreachable = (ClusterEvent.UnreachableMember) message; + nodes.remove(unreachable.member().address()); + logger.debug("{} received un-reachable for {}", name, unreachable.member().address()); + + } else if (message instanceof ClusterEvent.ReachableMember) { + ClusterEvent.ReachableMember reachable = (ClusterEvent.ReachableMember) message; + if (reachable.member().hasRole("io")) { + nodes.add( reachable.member().address() ); + } + logger.debug("{} received reachable for {}", name, reachable.member().address()); + + } else { + logger.error("{}: unhandled message: {}", name, message.toString()); + unhandled(message); + } + } + + /** + * RequestAction responds to StatusRequests. + */ + public static class StatusRequest implements Serializable { } + + /** + * RequestActor responds with, and some times unilaterally sends StatusMessages. + */ + public static class StatusMessage implements Serializable { + final String name; + + public Status getStatus() { + return status; + } + + public enum Status { INITIALIZING, READY } + private final Status status; + public StatusMessage(String name, Status status) { + this.name = name; + this.status = status; + } + public String getName() { + return name; + } + public boolean isReady() { + return status.equals( Status.READY ); + } + } + + + public static class ErrorResponse implements Serializable { + private String message; + public ErrorResponse( String message ) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } + +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java new file mode 100644 index 0000000..9304c4c --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + +import akka.actor.Actor; +import akka.actor.IndirectActorProducer; +import com.google.inject.Injector; + + +public class GuiceActorProducer implements IndirectActorProducer { + + final Injector injector; + final Class<? extends Actor> actorClass; + + public GuiceActorProducer(Injector injector, Class<? extends Actor> actorClass) { + this.injector = injector; + this.actorClass = actorClass; + } + + @Override + public Class<? extends Actor> actorClass() { + return actorClass; + } + + @Override + public Actor produce() { + return injector.getInstance( actorClass ); + } +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java new file mode 100644 index 0000000..ac2c7ee --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + +import akka.actor.ActorSystem; + +import java.util.Map; + + +public interface RouterProducer { + + /** + * Create cluster single manager for current region. + * Will be called once per router per JVM. + */ + void createClusterSingletonManager( ActorSystem system ); + + /** + * Create cluster singleton proxy for region. + * Will be called once per router per JVM per region. + */ + void createClusterSingletonProxy( ActorSystem system ); + + /** + * Create other actors needed to support the router produced by the implementation. + */ + void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/resources/application.conf b/stack/corepersistence/actorsystem/src/main/resources/application.conf new file mode 100644 index 0000000..93854f9 --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/resources/application.conf @@ -0,0 +1,28 @@ +akka { + + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "ERROR" + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + + actor { + provider = "akka.cluster.ClusterActorRefProvider" + } + + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } +} + +# Disable legacy metrics in akka-cluster. +akka.cluster.metrics.enabled=off + +# Enable metrics extension in akka-cluster-metrics. +akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"] + +# Sigar native library extract location during tests. +# Note: use per-jvm-instance folder when running multiple jvm on one host. +akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf b/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf new file mode 100644 index 0000000..907aebb --- /dev/null +++ b/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf @@ -0,0 +1,25 @@ +include "application" + +akka.actor.deployment { + /uvRouter/singleton/router { + router = consistent-hashing-pool + cluster { + enabled = on + allow-local-routees = on + + # singleton will only run on nodes with role "io" + use-role = io + + # more forgiving failure detector + failure-detector { + threshold = 10 + acceptable-heartbeat-pause = 3 s + heartbeat-interval = 1 s + heartbeat-request { + expected-response-after = 3 s + } + } + + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java new file mode 100644 index 0000000..a12c5e1 --- /dev/null +++ b/stack/corepersistence/actorsystem/src/test/java/org/apache/usergrid/persistence/actorsystem/ActorServiceServiceTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.actorsystem; + +import com.google.inject.Inject; +import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + + +@RunWith( ITRunner.class ) +@UseModules( ActorSystemModule.class ) +public class ActorServiceServiceTest { + private static final Logger logger = LoggerFactory.getLogger( ActorServiceServiceTest.class ); + + @Inject + ActorSystemFig actorSystemFig; + + private static AtomicBoolean startedAkka = new AtomicBoolean( false ); + + + @Before + public void initAkka() { + if ( !startedAkka.getAndSet( true ) ) { + } + } + + + @Test + public void testBasicOperation() throws Exception { + initAkka(); + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/collection/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml index ad9cefd..88b96b1 100644 --- a/stack/corepersistence/collection/pom.xml +++ b/stack/corepersistence/collection/pom.xml @@ -1,4 +1,20 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> @@ -26,6 +42,12 @@ </dependency> <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>actorsystem</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.11</artifactId> <version>2.4.0</version> http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java index b6056b5..dff4a12 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java @@ -126,10 +126,4 @@ public interface EntityCollectionManager { * Returns health of entity data store. */ Health getHealth(); - - /** - * For testing purposes only. - */ - void startAkkaForTesting( String hostname, int port, String region ); - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java index 45e519e..ae73e47 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection.guice; import java.util.concurrent.ThreadPoolExecutor; +import org.apache.usergrid.persistence.actorsystem.ActorSystemFig; +import org.apache.usergrid.persistence.actorsystem.ActorSystemModule; import org.apache.usergrid.persistence.collection.uniquevalues.*; import org.safehaus.guicyfig.GuicyFigModule; @@ -55,11 +57,11 @@ public abstract class CollectionModule extends AbstractModule { protected void configure() { // noinspection unchecked - install( new GuicyFigModule( AkkaFig.class ) ); install( new GuicyFigModule( SerializationFig.class ) ); install( new GuicyFigModule( CollectionSchedulerFig.class ) ); install( new SerializationModule() ); install( new ServiceModule() ); + install( new ActorSystemModule() ); // users of this module can add their own implemementations // create a guice factor for getting our collection manager http://git-wip-us.apache.org/repos/asf/usergrid/blob/ee18950f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index 18a07ac..3877fe3 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.collection.impl; import java.util.concurrent.ExecutionException; +import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.cache.EntityCacheFig; @@ -76,6 +77,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag private final Keyspace keyspace; private final MetricsFactory metricsFactory; private final RxTaskScheduler rxTaskScheduler; + private final ActorSystemManager actorSystemManager; private final UniqueValuesService uniqueValuesService; @@ -101,6 +103,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag metricsFactory, serializationFig, rxTaskScheduler, + actorSystemManager, uniqueValuesService, scope ); @@ -128,6 +131,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag final EntityCacheFig entityCacheFig, final MetricsFactory metricsFactory, @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler, + final ActorSystemManager actorSystemManager, final UniqueValuesService uniqueValuesService ) { this.writeStart = writeStart; @@ -146,6 +150,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag this.keyspace = keyspace; this.metricsFactory = metricsFactory; this.rxTaskScheduler = rxTaskScheduler; + this.actorSystemManager = actorSystemManager; this.uniqueValuesService = uniqueValuesService; }
